Compare commits

..

34 Commits

Author SHA1 Message Date
Devin AI
8b52e05006 feat: update LiteLLM dependency to >=1.77.4 to address CVEs
- Updated LiteLLM from ==1.74.9 to >=1.77.4
- Added test to verify LiteLLM integration works with new version
- Addresses security vulnerabilities mentioned in issue #3602

Co-Authored-By: João <joao@crewai.com>
2025-09-26 15:35:48 +00:00
Greyson LaLonde
12fa7e2ff1 fix: rename watson to watsonx embedding provider and prefix env vars
- prefix provider env vars with embeddings_  
- rename watson → watsonx in providers  
- add deprecation warning and alias for legacy 'watson' key (to be removed in v1.0.0)
2025-09-26 10:57:18 -04:00
Greyson LaLonde
091d1267d8 fix: prefix embedding provider env vars with EMBEDDINGS_ 2025-09-26 10:50:45 -04:00
Lorenze Jay
b5b10a8cde chore: update version and dependencies to 0.201.0 (#3593)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Bump CrewAI version to 0.201.0 in __init__.py
- Update dependency versions in pyproject.toml for crew, flow, and tool templates to require CrewAI 0.201.0
- Remove unnecessary blank line in pyproject.toml
2025-09-25 18:04:12 -07:00
Greyson LaLonde
2485ed93d6 feat: upgrade chromadb to v1.1.0, improve types
- update imports and include handling for chromadb v1.1.0  
- fix mypy and typing_compat issues (required, typeddict, voyageai)  
- refine embedderconfig typing and allow base provider instances  
- handle mem0 as special case for external memory storage  
- bump tools and clean up redundant deps
2025-09-25 20:48:37 -04:00
Greyson LaLonde
ce5ea9be6f feat: add custom embedding types and migrate providers
- introduce baseembeddingsprovider and helper for embedding functions  
- add core embedding types and migrate providers, factory, and storage modules  
- remove unused type aliases and fix pydantic schema error  
- update providers with env var support and related fixes
2025-09-25 18:28:39 -04:00
Greyson LaLonde
e070c1400c feat: update pydantic, add pydantic-settings, migrate to dependency-groups
Some checks failed
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Add pydantic-settings>=2.10.1 dependency for configuration management
- Update pydantic to 2.11.9 and python-dotenv to 1.1.1
- Migrate from deprecated tool.uv.dev-dependencies to dependency-groups.dev format
- Remove unnecessary dev dependencies: pillow, cairosvg
- Update all dev tooling to latest versions
- Remove duplicate python-dotenv from dev dependencies
2025-09-24 14:42:18 -04:00
Greyson LaLonde
6537e3737d fix: correct directory name in quickstart documentation 2025-09-24 11:41:33 -04:00
Greyson LaLonde
346faf229f feat: add pydantic-compatible import validation and deprecate old utilities 2025-09-24 11:36:02 -04:00
Lorenze Jay
a0b757a12c Lorenze/traces mark as failed (#3586)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* marking trace batch as failed if its failed

* fix test
2025-09-23 22:02:27 -07:00
Greyson LaLonde
1dbe8aab52 fix: add batch_size support to prevent embedder token limit errors
- add batch_size field to baseragconfig (default=100)  
- update chromadb/qdrant clients and factories to use batch_size  
- extract and filter batch_size from embedder config in knowledgestorage  
- fix large csv files exceeding embedder token limits (#3574)  
- remove unneeded conditional for type  

Co-authored-by: Vini Brasil <vini@hey.com>
2025-09-24 00:05:43 -04:00
Greyson LaLonde
4ac65eb0a6 fix: support nested config format for embedder configuration
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
- support nested config format with embedderconfig typeddict  
- fix parsing for model/model_name compatibility  
- add validation, typing_extensions, and improved type hints  
- enhance embedding factory with env var injection and provider support  
- add tests for openai, azure, and all embedding providers  
- misc fixes: test file rename, updated mocking patterns
2025-09-23 11:57:46 -04:00
Greyson LaLonde
3e97393f58 chore: improve typing and consolidate utilities
- add type annotations across utility modules  
- refactor printer system, agent utils, and imports for consistency  
- remove unused modules, constants, and redundant patterns  
- improve runtime type checks, exception handling, and guardrail validation  
- standardize warning suppression and logging utilities  
- fix llm typing, threading/typing edge cases, and test behavior
2025-09-23 11:33:46 -04:00
Heitor Carvalho
34bed359a6 feat: add crewai uv wrapper for uv commands (#3581) 2025-09-23 10:55:15 -04:00
Tony Kipkemboi
feeed505bb docs(changelog): add releases 0.193.2, 0.193.1, 0.193.0, 0.186.1, 0.186.0 across en/ko/pt-BR (#3577)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-09-22 16:19:55 -07:00
Greyson LaLonde
cb0efd05b4 chore: fix ruff linting issues in tools module
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
linting, args_schema default, and validator check
2025-09-22 13:13:23 -04:00
Greyson LaLonde
db5f565dea fix: apply ruff linting fixes to tasks module 2025-09-22 13:09:53 -04:00
Greyson LaLonde
58413b663a chore: fix ruff linting issues in rag module
linting, list embedding handling, and test update
2025-09-22 13:06:22 -04:00
Greyson LaLonde
37636f0dd7 chore: fix ruff linting and mypy issues in flow module 2025-09-22 13:03:06 -04:00
Greyson LaLonde
0e370593f1 chore: resolve all ruff and mypy issues in experimental module
resolve linting, typing, and import issues; update Okta test
2025-09-22 12:56:28 -04:00
Vini Brasil
aa8dc9d77f Add source to LLM Guardrail events (#3572)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
This commit adds the source attribute to LLM Guardrail event calls to
identify the Lite Agent or Task that executed the guardrail.
2025-09-22 11:58:00 +09:00
Jonathan Hill
9c1096dbdc fix: Make 'ready' parameter optional in _create_reasoning_plan function (#3561)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* fix: Make 'ready' parameter optional in _create_reasoning_plan function

This PR fixes Issue #3466 where the _create_reasoning_plan function was missing
the 'ready' parameter when called by the LLM. The fix makes the 'ready' parameter
optional with a default value of False, which allows the function to be called
with only the 'plan' argument.

Fixes #3466

* Change default value of 'ready' parameter to True

---------

Co-authored-by: João Moura <joaomdmoura@gmail.com>
2025-09-20 22:57:18 -03:00
João Moura
47044450c0 Adding fallback to crew settings (#3562)
* Adding fallback to crew settings

* fix: resolve ruff and mypy issues in cli/config.py

---------

Co-authored-by: Greyson Lalonde <greyson.r.lalonde@gmail.com>
2025-09-20 22:54:36 -03:00
João Moura
0ee438c39d fix version (#3557) 2025-09-20 17:14:28 -03:00
Joao Moura
cbb9965bf7 preparing new version 2025-09-20 12:27:25 -07:00
João Moura
4951d30dd9 Dix issues with getting id (#3556)
* fix issues with getting id

* ignore linter

* fix: resolve ruff linting issues in tracing utils

---------

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2025-09-20 15:29:25 -03:00
Greyson LaLonde
7426969736 chore: apply ruff linting fixes and type annotations to memory module
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-09-19 22:20:13 -04:00
Greyson LaLonde
d879be8b66 chore: fix ruff linting issues in agents module
fix(agents): linting, import paths, cache key alignment, and static method
2025-09-19 22:11:21 -04:00
Greyson LaLonde
24b84a4b68 chore: apply ruff linting fixes to crews module 2025-09-19 22:02:22 -04:00
Greyson LaLonde
8e571ea8a7 chore: fix ruff linting and mypy issues in knowledge module
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
2025-09-19 21:39:15 -04:00
Greyson LaLonde
2cfc4d37b8 chore: apply ruff linting fixes to events module
fix: apply ruff linting to events
2025-09-19 20:10:55 -04:00
Greyson LaLonde
f4abc41235 chore: apply ruff linting fixes to CLI module
fix: apply ruff fixes to CLI and update Okta provider test
2025-09-19 19:55:55 -04:00
Greyson LaLonde
de5d3c3ad1 chore: add pydantic.mypy plugin for better type checking 2025-09-19 19:23:33 -04:00
Lorenze Jay
c062826779 chore: update dependencies and versioning for CrewAI 0.193.0 (#3542)
* chore: update dependencies and versioning for CrewAI

- Bump `crewai-tools` dependency version from `0.71.0` to `0.73.0` in `pyproject.toml`.
- Update CrewAI version from `0.186.1` to `0.193.0` in `__init__.py`.
- Adjust dependency versions in CLI templates for crew, flow, and tool to reflect the new CrewAI version.

This update ensures compatibility with the latest features and improvements in CrewAI.

* remove embedchain mock

* fix: remove last embedchain mocks

* fix: remove langchain_openai from tests

---------

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2025-09-19 16:01:55 -03:00
1101 changed files with 12118 additions and 53522 deletions

View File

@@ -6,7 +6,7 @@ permissions:
contents: read
env:
OPENAI_API_KEY: fake-openai-key
OPENAI_API_KEY: fake-api-key
PYTHONUNBUFFERED: 1
jobs:
@@ -45,7 +45,7 @@ jobs:
enable-cache: false
- name: Install the project
run: uv sync --all-groups --all-extras && uv sync --all-extras --package crewai-core && uv sync --all-extras --package crewai-tools
run: uv sync --all-groups --all-extras
- name: Restore test durations
uses: actions/cache/restore@v4
@@ -75,7 +75,7 @@ jobs:
# DURATIONS_ARG="--durations-path=${DURATION_FILE}"
# fi
uv run pytest packages/crewai/tests \
uv run pytest \
--block-network \
--timeout=30 \
-vv \

View File

@@ -1,84 +0,0 @@
name: Generate Tool Specifications
on:
push:
branches:
- main
permissions:
contents: write
pull-requests: write
jobs:
generate-specs:
if: github.event.head_commit.author.name != 'crewai-tools-spec-generator[bot]'
runs-on: ubuntu-latest
outputs:
specs_changed: ${{ steps.check_changes.outputs.specs_changed }}
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
enable-cache: true
- name: Set up Python
run: uv python install 3.12.8
- name: Install the project
run: uv sync --dev --all-extras
- name: Generate tool specifications
run: uv run python packages/tools/generate_tool_specs.py
- name: Configure Git and add upstream
run: |
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git remote add upstream https://github.com/crewAIInc/crewAI-tools.git
git fetch upstream
- name: Check for changes in tool specifications
id: check_changes
run: |
git add packages/tools/tool.specs.json
if git diff --quiet --staged; then
echo "No changes detected in tool.specs.json"
echo "specs_changed=false" >> $GITHUB_OUTPUT
else
echo "Changes detected in tool.specs.json"
echo "specs_changed=true" >> $GITHUB_OUTPUT
fi
- name: Generate GitHub App token
if: steps.check_changes.outputs.specs_changed == 'true'
id: app-token
uses: tibdex/github-app-token@v2
with:
app_id: ${{ secrets.CREWAI_RELEASE_TOOL_APP_ID }}
private_key: ${{ secrets.CREWAI_RELEASE_TOOL_PRIVATE_KEY }}
- name: Create Pull Request
if: steps.check_changes.outputs.specs_changed == 'true'
uses: peter-evans/create-pull-request@v7
with:
token: ${{ steps.app-token.outputs.token }}
title: "Automatically update tool specifications"
base: main
branch: update-tool-specs
commit-message: "Automatically update tool specifications"
committer: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
delete-branch: true
notify-api:
if: github.event.head_commit.author.name == 'crewai-tools-spec-generator[bot]'
runs-on: ubuntu-latest
steps:
- name: Notify API about tool specification update
run: |
curl -X POST https://app.crewai.com/crewai_plus/api/v1/internal_tool_releases \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${{ secrets.CREWAI_RELEASE_TOOL_API_KEY }}"

View File

@@ -1,42 +0,0 @@
name: Tools Tests
on: [pull_request]
permissions:
contents: write
env:
OPENAI_API_KEY: fake-openai-key
BRAVE_API_KEY: fake-brave-key
SNOWFLAKE_USER: fake-snowflake-user
SNOWFLAKE_PASSWORD: fake-snowflake-password
SNOWFLAKE_ACCOUNT: fake-snowflake-account
SNOWFLAKE_WAREHOUSE: fake-snowflake-warehouse
SNOWFLAKE_DATABASE: fake-snowflake-database
SNOWFLAKE_SCHEMA: fake-snowflake-schema
EMBEDCHAIN_DB_URI: sqlite:///test.db
jobs:
tests:
runs-on: ubuntu-latest
timeout-minutes: 15
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
enable-cache: true
- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}
- name: Install the project
run: uv sync --all-groups --package crewai-tools --all-extras
- name: Run tests
run: uv run pytest packages/tools/tests -vv

View File

@@ -6,16 +6,14 @@ repos:
entry: uv run ruff check
language: system
types: [python]
files: ^(packages/crewai/src/|packages/tools/src/).*\.py$
- id: ruff-format
name: ruff-format
entry: uv run ruff format
language: system
types: [python]
files: ^(packages/crewai/src/|packages/tools/src/).*\.py$
- id: mypy
name: mypy
entry: uv run mypy
language: system
types: [python]
exclude: ^(packages/.*/tests/|packages/crewai/src/crewai/cli/templates/)
exclude: ^tests/

View File

@@ -5,6 +5,82 @@ icon: "clock"
mode: "wide"
---
<Update label="Sep 20, 2025">
## v0.193.2
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/0.193.2)
## What's Changed
- Updated pyproject templates to use the right version
</Update>
<Update label="Sep 20, 2025">
## v0.193.1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/0.193.1)
## What's Changed
- Series of minor fixes and linter improvements
</Update>
<Update label="Sep 19, 2025">
## v0.193.0
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/0.193.0)
## Core Improvements & Fixes
- Fixed handling of the `model` parameter during OpenAI adapter initialization
- Resolved test duration cache issues in CI workflows
- Fixed flaky test related to repeated tool usage by agents
- Added missing event exports to `__init__.py` for consistent module behavior
- Dropped message storage from metadata in Mem0 to reduce bloat
- Fixed L2 distance metric support for backward compatibility in vector search
## New Features & Enhancements
- Introduced thread-safe platform context management
- Added test duration caching for optimized `pytest-split` runs
- Added ephemeral trace improvements for better trace control
- Made search parameters for RAG, knowledge, and memory fully configurable
- Enabled ChromaDB to use OpenAI API for embedding functions
- Added deeper observability tools for user-level insights
- Unified RAG storage system with instance-specific client support
## Documentation & Guides
- Updated `RagTool` references to reflect CrewAI native RAG implementation
- Improved internal docs for `langgraph` and `openai` agent adapters with type annotations and docstrings
</Update>
<Update label="Sep 11, 2025">
## v0.186.1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/0.186.1)
## What's Changed
- Fixed version not being found and silently failing reversion
- Bumped CrewAI version to 0.186.1 and updated dependencies in the CLI
</Update>
<Update label="Sep 10, 2025">
## v0.186.0
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/0.186.0)
## What's Changed
- Refer to the GitHub release notes for detailed changes
</Update>
<Update label="Sep 04, 2025">
## v0.177.0

View File

@@ -404,6 +404,10 @@ crewai config reset
After resetting configuration, re-run `crewai login` to authenticate again.
</Tip>
<Tip>
CrewAI CLI handles authentication to the Tool Repository automatically when adding packages to your project. Just append `crewai` before any `uv` command to use it. E.g. `crewai uv add requests`. For more information, see [Tool Repository](https://docs.crewai.com/enterprise/features/tool-repository) docs.
</Tip>
<Note>
Configuration settings are stored in `~/.config/crewai/settings.json`. Some settings like organization name and UUID are read-only and managed through authentication and organization commands. Tool repository related settings are hidden and cannot be set directly by users.
</Note>

View File

@@ -52,6 +52,36 @@ researcher = Agent(
)
```
## Adding other packages after installing a tool
After installing a tool from the CrewAI Enterprise Tool Repository, you need to use the `crewai uv` command to add other packages to your project.
Using pure `uv` commands will fail due to authentication to tool repository being handled by the CLI. By using the `crewai uv` command, you can add other packages to your project without having to worry about authentication.
Any `uv` command can be used with the `crewai uv` command, making it a powerful tool for managing your project's dependencies without the hassle of managing authentication through environment variables or other methods.
Say that you have installed a custom tool from the CrewAI Enterprise Tool Repository called "my-tool":
```bash
crewai tool install my-tool
```
And now you want to add another package to your project, you can use the following command:
```bash
crewai uv add requests
```
Other commands like `uv sync` or `uv remove` can also be used with the `crewai uv` command:
```bash
crewai uv sync
```
```bash
crewai uv remove requests
```
This will add the package to your project and update `pyproject.toml` accordingly.
## Creating and Publishing Tools
To create a new tool project:

View File

@@ -27,7 +27,7 @@ Follow the steps below to get Crewing! 🚣‍♂️
<Step title="Navigate to your new crew project">
<CodeGroup>
```shell Terminal
cd latest-ai-development
cd latest_ai_development
```
</CodeGroup>
</Step>

View File

@@ -9,7 +9,7 @@ mode: "wide"
## Description
The `RagTool` is designed to answer questions by leveraging the power of Retrieval-Augmented Generation (RAG) through EmbedChain.
The `RagTool` is designed to answer questions by leveraging the power of Retrieval-Augmented Generation (RAG) through CrewAI's native RAG system.
It provides a dynamic knowledge base that can be queried to retrieve relevant information from various data sources.
This tool is particularly useful for applications that require access to a vast array of information and need to provide contextually relevant answers.
@@ -76,8 +76,8 @@ The `RagTool` can be used with a wide variety of data sources, including:
The `RagTool` accepts the following parameters:
- **summarize**: Optional. Whether to summarize the retrieved content. Default is `False`.
- **adapter**: Optional. A custom adapter for the knowledge base. If not provided, an EmbedchainAdapter will be used.
- **config**: Optional. Configuration for the underlying EmbedChain App.
- **adapter**: Optional. A custom adapter for the knowledge base. If not provided, a CrewAIRagAdapter will be used.
- **config**: Optional. Configuration for the underlying CrewAI RAG system.
## Adding Content
@@ -130,44 +130,23 @@ from crewai_tools import RagTool
# Create a RAG tool with custom configuration
config = {
"app": {
"name": "custom_app",
},
"llm": {
"provider": "openai",
"vectordb": {
"provider": "qdrant",
"config": {
"model": "gpt-4",
"collection_name": "my-collection"
}
},
"embedding_model": {
"provider": "openai",
"config": {
"model": "text-embedding-ada-002"
"model": "text-embedding-3-small"
}
},
"vectordb": {
"provider": "elasticsearch",
"config": {
"collection_name": "my-collection",
"cloud_id": "deployment-name:xxxx",
"api_key": "your-key",
"verify_certs": False
}
},
"chunker": {
"chunk_size": 400,
"chunk_overlap": 100,
"length_function": "len",
"min_chunk_size": 0
}
}
rag_tool = RagTool(config=config, summarize=True)
```
The internal RAG tool utilizes the Embedchain adapter, allowing you to pass any configuration options that are supported by Embedchain.
You can refer to the [Embedchain documentation](https://docs.embedchain.ai/components/introduction) for details.
Make sure to review the configuration options available in the .yaml file.
## Conclusion
The `RagTool` provides a powerful way to create and query knowledge bases from various data sources. By leveraging Retrieval-Augmented Generation, it enables agents to access and retrieve relevant information efficiently, enhancing their ability to provide accurate and contextually appropriate responses.

View File

@@ -5,6 +5,82 @@ icon: "clock"
mode: "wide"
---
<Update label="2025년 9월 20일">
## v0.193.2
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/0.193.2)
## 변경 사항
- 올바른 버전을 사용하도록 pyproject 템플릿 업데이트
</Update>
<Update label="2025년 9월 20일">
## v0.193.1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/0.193.1)
## 변경 사항
- 일련의 사소한 수정 및 린터 개선
</Update>
<Update label="2025년 9월 19일">
## v0.193.0
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/0.193.0)
## 핵심 개선 사항 및 수정 사항
- OpenAI 어댑터 초기화 중 `model` 매개변수 처리 수정
- CI 워크플로에서 테스트 소요 시간 캐시 문제 해결
- 에이전트의 반복 도구 사용과 관련된 불안정한 테스트 수정
- 일관된 모듈 동작을 위해 누락된 이벤트 내보내기를 `__init__.py`에 추가
- 메타데이터 부하를 줄이기 위해 Mem0에서 메시지 저장 제거
- 벡터 검색의 하위 호환성을 위해 L2 거리 메트릭 지원 수정
## 새로운 기능 및 향상 사항
- 스레드 안전한 플랫폼 컨텍스트 관리 도입
- `pytest-split` 실행 최적화를 위한 테스트 소요 시간 캐싱 추가
- 더 나은 추적 제어를 위한 일시적(trace) 개선
- RAG, 지식, 메모리 검색 매개변수를 완전 구성 가능하게 변경
- ChromaDB가 임베딩 함수에 OpenAI API를 사용할 수 있도록 지원
- 사용자 수준 인사이트를 위한 심화된 관찰 가능성 도구 추가
- 인스턴스별 클라이언트를 지원하는 통합 RAG 스토리지 시스템
## 문서 및 가이드
- CrewAI 네이티브 RAG 구현을 반영하도록 `RagTool` 참조 업데이트
- 타입 주석과 도크스트링을 포함해 `langgraph` 및 `openai` 에이전트 어댑터 내부 문서 개선
</Update>
<Update label="2025년 9월 11일">
## v0.186.1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/0.186.1)
## 변경 사항
- 버전을 찾지 못해 조용히 되돌리는(reversion) 문제 수정
- CLI에서 CrewAI 버전을 0.186.1로 올리고 의존성 업데이트
</Update>
<Update label="2025년 9월 10일">
## v0.186.0
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/0.186.0)
## 변경 사항
- 자세한 변경 사항은 GitHub 릴리스 노트를 참조하세요
</Update>
<Update label="2025년 9월 4일">
## v0.177.0

View File

@@ -27,7 +27,7 @@ mode: "wide"
<Step title="새로운 crew 프로젝트로 이동하기">
<CodeGroup>
```shell Terminal
cd latest-ai-development
cd latest_ai_development
```
</CodeGroup>
</Step>

View File

@@ -5,6 +5,82 @@ icon: "clock"
mode: "wide"
---
<Update label="20 set 2025">
## v0.193.2
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/0.193.2)
## O que Mudou
- Atualizados templates do pyproject para usar a versão correta
</Update>
<Update label="20 set 2025">
## v0.193.1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/0.193.1)
## O que Mudou
- Série de pequenas correções e melhorias de linter
</Update>
<Update label="19 set 2025">
## v0.193.0
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/0.193.0)
## Melhorias e Correções Principais
- Corrigido manuseio do parâmetro `model` durante a inicialização do adaptador OpenAI
- Resolvidos problemas de cache da duração de testes nos fluxos de CI
- Corrigido teste instável relacionado ao uso repetido de ferramentas pelos agentes
- Adicionadas exportações de eventos ausentes no `__init__.py` para comportamento consistente do módulo
- Removido armazenamento de mensagem dos metadados no Mem0 para reduzir inchaço
- Corrigido suporte à métrica de distância L2 para compatibilidade retroativa na busca vetorial
## Novos Recursos e Melhorias
- Introduzida gestão de contexto de plataforma com segurança de threads
- Adicionado cache da duração de testes para execuções otimizadas do `pytest-split`
- Melhorias de traces efêmeros para melhor controle de rastreamento
- Parâmetros de busca para RAG, conhecimento e memória totalmente configuráveis
- Habilitado ChromaDB para usar a OpenAI API para funções de embedding
- Adicionadas ferramentas de observabilidade mais profundas para insights ao nível do usuário
- Sistema de armazenamento RAG unificado com suporte a cliente específico por instância
## Documentação e Guias
- Atualizadas referências do `RagTool` para refletir a implementação nativa de RAG do CrewAI
- Melhorada documentação interna para adaptadores de agente `langgraph` e `openai` com anotações de tipo e docstrings
</Update>
<Update label="11 set 2025">
## v0.186.1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/0.186.1)
## O que Mudou
- Corrigida falha silenciosa de reversão quando a versão não era encontrada
- Versão do CrewAI atualizada para 0.186.1 e dependências do CLI atualizadas
</Update>
<Update label="10 set 2025">
## v0.186.0
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/0.186.0)
## O que Mudou
- Consulte as notas de lançamento no GitHub para detalhes completos
</Update>
<Update label="04 set 2025">
## v0.177.0

View File

@@ -27,7 +27,7 @@ Siga os passos abaixo para começar a tripular! 🚣‍♂️
<Step title="Navegue até o novo projeto da sua tripulação">
<CodeGroup>
```shell Terminal
cd latest-ai-development
cd latest_ai_development
```
</CodeGroup>
</Step>

View File

@@ -1,777 +0,0 @@
<p align="center">
<a href="https://github.com/crewAIInc/crewAI">
<img src="docs/images/crewai_logo.png" width="600px" alt="Open source Multi-AI Agent orchestration framework">
</a>
</p>
<p align="center" style="display: flex; justify-content: center; gap: 20px; align-items: center;">
<a href="https://trendshift.io/repositories/11239" target="_blank">
<img src="https://trendshift.io/api/badge/repositories/11239" alt="crewAIInc%2FcrewAI | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/>
</a>
</p>
<p align="center">
<a href="https://crewai.com">Homepage</a>
·
<a href="https://docs.crewai.com">Docs</a>
·
<a href="https://app.crewai.com">Start Cloud Trial</a>
·
<a href="https://blog.crewai.com">Blog</a>
·
<a href="https://community.crewai.com">Forum</a>
</p>
<p align="center">
<a href="https://github.com/crewAIInc/crewAI">
<img src="https://img.shields.io/github/stars/crewAIInc/crewAI" alt="GitHub Repo stars">
</a>
<a href="https://github.com/crewAIInc/crewAI/network/members">
<img src="https://img.shields.io/github/forks/crewAIInc/crewAI" alt="GitHub forks">
</a>
<a href="https://github.com/crewAIInc/crewAI/issues">
<img src="https://img.shields.io/github/issues/crewAIInc/crewAI" alt="GitHub issues">
</a>
<a href="https://github.com/crewAIInc/crewAI/pulls">
<img src="https://img.shields.io/github/issues-pr/crewAIInc/crewAI" alt="GitHub pull requests">
</a>
<a href="https://opensource.org/licenses/MIT">
<img src="https://img.shields.io/badge/License-MIT-green.svg" alt="License: MIT">
</a>
</p>
<p align="center">
<a href="https://pypi.org/project/crewai/">
<img src="https://img.shields.io/pypi/v/crewai" alt="PyPI version">
</a>
<a href="https://pypi.org/project/crewai/">
<img src="https://img.shields.io/pypi/dm/crewai" alt="PyPI downloads">
</a>
<a href="https://twitter.com/crewAIInc">
<img src="https://img.shields.io/twitter/follow/crewAIInc?style=social" alt="Twitter Follow">
</a>
</p>
### Fast and Flexible Multi-Agent Automation Framework
> CrewAI is a lean, lightning-fast Python framework built entirely from scratch—completely **independent of LangChain or other agent frameworks**.
> It empowers developers with both high-level simplicity and precise low-level control, ideal for creating autonomous AI agents tailored to any scenario.
- **CrewAI Crews**: Optimize for autonomy and collaborative intelligence.
- **CrewAI Flows**: Enable granular, event-driven control, single LLM calls for precise task orchestration and supports Crews natively
With over 100,000 developers certified through our community courses at [learn.crewai.com](https://learn.crewai.com), CrewAI is rapidly becoming the
standard for enterprise-ready AI automation.
# CrewAI Enterprise Suite
CrewAI Enterprise Suite is a comprehensive bundle tailored for organizations that require secure, scalable, and easy-to-manage agent-driven automation.
You can try one part of the suite the [Crew Control Plane for free](https://app.crewai.com)
## Crew Control Plane Key Features:
- **Tracing & Observability**: Monitor and track your AI agents and workflows in real-time, including metrics, logs, and traces.
- **Unified Control Plane**: A centralized platform for managing, monitoring, and scaling your AI agents and workflows.
- **Seamless Integrations**: Easily connect with existing enterprise systems, data sources, and cloud infrastructure.
- **Advanced Security**: Built-in robust security and compliance measures ensuring safe deployment and management.
- **Actionable Insights**: Real-time analytics and reporting to optimize performance and decision-making.
- **24/7 Support**: Dedicated enterprise support to ensure uninterrupted operation and quick resolution of issues.
- **On-premise and Cloud Deployment Options**: Deploy CrewAI Enterprise on-premise or in the cloud, depending on your security and compliance requirements.
CrewAI Enterprise is designed for enterprises seeking a powerful, reliable solution to transform complex business processes into efficient,
intelligent automations.
## Table of contents
- [Why CrewAI?](#why-crewai)
- [Getting Started](#getting-started)
- [Key Features](#key-features)
- [Understanding Flows and Crews](#understanding-flows-and-crews)
- [CrewAI vs LangGraph](#how-crewai-compares)
- [Examples](#examples)
- [Quick Tutorial](#quick-tutorial)
- [Write Job Descriptions](#write-job-descriptions)
- [Trip Planner](#trip-planner)
- [Stock Analysis](#stock-analysis)
- [Using Crews and Flows Together](#using-crews-and-flows-together)
- [Connecting Your Crew to a Model](#connecting-your-crew-to-a-model)
- [How CrewAI Compares](#how-crewai-compares)
- [Frequently Asked Questions (FAQ)](#frequently-asked-questions-faq)
- [Contribution](#contribution)
- [Telemetry](#telemetry)
- [License](#license)
## Why CrewAI?
<div align="center" style="margin-bottom: 30px;">
<img src="docs/images/asset.png" alt="CrewAI Logo" width="100%">
</div>
CrewAI unlocks the true potential of multi-agent automation, delivering the best-in-class combination of speed, flexibility, and control with either Crews of AI Agents or Flows of Events:
- **Standalone Framework**: Built from scratch, independent of LangChain or any other agent framework.
- **High Performance**: Optimized for speed and minimal resource usage, enabling faster execution.
- **Flexible Low Level Customization**: Complete freedom to customize at both high and low levels - from overall workflows and system architecture to granular agent behaviors, internal prompts, and execution logic.
- **Ideal for Every Use Case**: Proven effective for both simple tasks and highly complex, real-world, enterprise-grade scenarios.
- **Robust Community**: Backed by a rapidly growing community of over **100,000 certified** developers offering comprehensive support and resources.
CrewAI empowers developers and enterprises to confidently build intelligent automations, bridging the gap between simplicity, flexibility, and performance.
## Getting Started
Setup and run your first CrewAI agents by following this tutorial.
[![CrewAI Getting Started Tutorial](https://img.youtube.com/vi/-kSOTtYzgEw/hqdefault.jpg)](https://www.youtube.com/watch?v=-kSOTtYzgEw "CrewAI Getting Started Tutorial")
###
Learning Resources
Learn CrewAI through our comprehensive courses:
- [Multi AI Agent Systems with CrewAI](https://www.deeplearning.ai/short-courses/multi-ai-agent-systems-with-crewai/) - Master the fundamentals of multi-agent systems
- [Practical Multi AI Agents and Advanced Use Cases](https://www.deeplearning.ai/short-courses/practical-multi-ai-agents-and-advanced-use-cases-with-crewai/) - Deep dive into advanced implementations
### Understanding Flows and Crews
CrewAI offers two powerful, complementary approaches that work seamlessly together to build sophisticated AI applications:
1. **Crews**: Teams of AI agents with true autonomy and agency, working together to accomplish complex tasks through role-based collaboration. Crews enable:
- Natural, autonomous decision-making between agents
- Dynamic task delegation and collaboration
- Specialized roles with defined goals and expertise
- Flexible problem-solving approaches
2. **Flows**: Production-ready, event-driven workflows that deliver precise control over complex automations. Flows provide:
- Fine-grained control over execution paths for real-world scenarios
- Secure, consistent state management between tasks
- Clean integration of AI agents with production Python code
- Conditional branching for complex business logic
The true power of CrewAI emerges when combining Crews and Flows. This synergy allows you to:
- Build complex, production-grade applications
- Balance autonomy with precise control
- Handle sophisticated real-world scenarios
- Maintain clean, maintainable code structure
### Getting Started with Installation
To get started with CrewAI, follow these simple steps:
### 1. Installation
Ensure you have Python >=3.10 <3.14 installed on your system. CrewAI uses [UV](https://docs.astral.sh/uv/) for dependency management and package handling, offering a seamless setup and execution experience.
First, install CrewAI:
```shell
pip install crewai
```
If you want to install the 'crewai' package along with its optional features that include additional tools for agents, you can do so by using the following command:
```shell
pip install 'crewai[tools]'
```
The command above installs the basic package and also adds extra components which require more dependencies to function.
### Troubleshooting Dependencies
If you encounter issues during installation or usage, here are some common solutions:
#### Common Issues
1. **ModuleNotFoundError: No module named 'tiktoken'**
- Install tiktoken explicitly: `pip install 'crewai[embeddings]'`
- If using embedchain or other tools: `pip install 'crewai[tools]'`
2. **Failed building wheel for tiktoken**
- Ensure Rust compiler is installed (see installation steps above)
- For Windows: Verify Visual C++ Build Tools are installed
- Try upgrading pip: `pip install --upgrade pip`
- If issues persist, use a pre-built wheel: `pip install tiktoken --prefer-binary`
### 2. Setting Up Your Crew with the YAML Configuration
To create a new CrewAI project, run the following CLI (Command Line Interface) command:
```shell
crewai create crew <project_name>
```
This command creates a new project folder with the following structure:
```
my_project/
├── .gitignore
├── pyproject.toml
├── README.md
├── .env
└── src/
└── my_project/
├── __init__.py
├── main.py
├── crew.py
├── tools/
│ ├── custom_tool.py
│ └── __init__.py
└── config/
├── agents.yaml
└── tasks.yaml
```
You can now start developing your crew by editing the files in the `src/my_project` folder. The `main.py` file is the entry point of the project, the `crew.py` file is where you define your crew, the `agents.yaml` file is where you define your agents, and the `tasks.yaml` file is where you define your tasks.
#### To customize your project, you can:
- Modify `src/my_project/config/agents.yaml` to define your agents.
- Modify `src/my_project/config/tasks.yaml` to define your tasks.
- Modify `src/my_project/crew.py` to add your own logic, tools, and specific arguments.
- Modify `src/my_project/main.py` to add custom inputs for your agents and tasks.
- Add your environment variables into the `.env` file.
#### Example of a simple crew with a sequential process:
Instantiate your crew:
```shell
crewai create crew latest-ai-development
```
Modify the files as needed to fit your use case:
**agents.yaml**
```yaml
# src/my_project/config/agents.yaml
researcher:
role: >
{topic} Senior Data Researcher
goal: >
Uncover cutting-edge developments in {topic}
backstory: >
You're a seasoned researcher with a knack for uncovering the latest
developments in {topic}. Known for your ability to find the most relevant
information and present it in a clear and concise manner.
reporting_analyst:
role: >
{topic} Reporting Analyst
goal: >
Create detailed reports based on {topic} data analysis and research findings
backstory: >
You're a meticulous analyst with a keen eye for detail. You're known for
your ability to turn complex data into clear and concise reports, making
it easy for others to understand and act on the information you provide.
```
**tasks.yaml**
```yaml
# src/my_project/config/tasks.yaml
research_task:
description: >
Conduct a thorough research about {topic}
Make sure you find any interesting and relevant information given
the current year is 2025.
expected_output: >
A list with 10 bullet points of the most relevant information about {topic}
agent: researcher
reporting_task:
description: >
Review the context you got and expand each topic into a full section for a report.
Make sure the report is detailed and contains any and all relevant information.
expected_output: >
A fully fledge reports with the mains topics, each with a full section of information.
Formatted as markdown without '```'
agent: reporting_analyst
output_file: report.md
```
**crew.py**
```python
# src/my_project/crew.py
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai_tools import SerperDevTool
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
@CrewBase
class LatestAiDevelopmentCrew():
"""LatestAiDevelopment crew"""
agents: List[BaseAgent]
tasks: List[Task]
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
verbose=True,
tools=[SerperDevTool()]
)
@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
output_file='report.md'
)
@crew
def crew(self) -> Crew:
"""Creates the LatestAiDevelopment crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
)
```
**main.py**
```python
#!/usr/bin/env python
# src/my_project/main.py
import sys
from latest_ai_development.crew import LatestAiDevelopmentCrew
def run():
"""
Run the crew.
"""
inputs = {
'topic': 'AI Agents'
}
LatestAiDevelopmentCrew().crew().kickoff(inputs=inputs)
```
### 3. Running Your Crew
Before running your crew, make sure you have the following keys set as environment variables in your `.env` file:
- An [OpenAI API key](https://platform.openai.com/account/api-keys) (or other LLM API key): `OPENAI_API_KEY=sk-...`
- A [Serper.dev](https://serper.dev/) API key: `SERPER_API_KEY=YOUR_KEY_HERE`
Lock the dependencies and install them by using the CLI command but first, navigate to your project directory:
```shell
cd my_project
crewai install (Optional)
```
To run your crew, execute the following command in the root of your project:
```bash
crewai run
```
or
```bash
python src/my_project/main.py
```
If an error happens due to the usage of poetry, please run the following command to update your crewai package:
```bash
crewai update
```
You should see the output in the console and the `report.md` file should be created in the root of your project with the full final report.
In addition to the sequential process, you can use the hierarchical process, which automatically assigns a manager to the defined crew to properly coordinate the planning and execution of tasks through delegation and validation of results. [See more about the processes here](https://docs.crewai.com/core-concepts/Processes/).
## Key Features
CrewAI stands apart as a lean, standalone, high-performance multi-AI Agent framework delivering simplicity, flexibility, and precise control—free from the complexity and limitations found in other agent frameworks.
- **Standalone & Lean**: Completely independent from other frameworks like LangChain, offering faster execution and lighter resource demands.
- **Flexible & Precise**: Easily orchestrate autonomous agents through intuitive [Crews](https://docs.crewai.com/concepts/crews) or precise [Flows](https://docs.crewai.com/concepts/flows), achieving perfect balance for your needs.
- **Seamless Integration**: Effortlessly combine Crews (autonomy) and Flows (precision) to create complex, real-world automations.
- **Deep Customization**: Tailor every aspect—from high-level workflows down to low-level internal prompts and agent behaviors.
- **Reliable Performance**: Consistent results across simple tasks and complex, enterprise-level automations.
- **Thriving Community**: Backed by robust documentation and over 100,000 certified developers, providing exceptional support and guidance.
Choose CrewAI to easily build powerful, adaptable, and production-ready AI automations.
## Examples
You can test different real life examples of AI crews in the [CrewAI-examples repo](https://github.com/crewAIInc/crewAI-examples?tab=readme-ov-file):
- [Landing Page Generator](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/landing_page_generator)
- [Having Human input on the execution](https://docs.crewai.com/how-to/Human-Input-on-Execution)
- [Trip Planner](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/trip_planner)
- [Stock Analysis](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/stock_analysis)
### Quick Tutorial
[![CrewAI Tutorial](https://img.youtube.com/vi/tnejrr-0a94/maxresdefault.jpg)](https://www.youtube.com/watch?v=tnejrr-0a94 "CrewAI Tutorial")
### Write Job Descriptions
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/job-posting) or watch a video below:
[![Jobs postings](https://img.youtube.com/vi/u98wEMz-9to/maxresdefault.jpg)](https://www.youtube.com/watch?v=u98wEMz-9to "Jobs postings")
### Trip Planner
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/trip_planner) or watch a video below:
[![Trip Planner](https://img.youtube.com/vi/xis7rWp-hjs/maxresdefault.jpg)](https://www.youtube.com/watch?v=xis7rWp-hjs "Trip Planner")
### Stock Analysis
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/stock_analysis) or watch a video below:
[![Stock Analysis](https://img.youtube.com/vi/e0Uj4yWdaAg/maxresdefault.jpg)](https://www.youtube.com/watch?v=e0Uj4yWdaAg "Stock Analysis")
### Using Crews and Flows Together
CrewAI's power truly shines when combining Crews with Flows to create sophisticated automation pipelines.
CrewAI flows support logical operators like `or_` and `and_` to combine multiple conditions. This can be used with `@start`, `@listen`, or `@router` decorators to create complex triggering conditions.
- `or_`: Triggers when any of the specified conditions are met.
- `and_`Triggers when all of the specified conditions are met.
Here's how you can orchestrate multiple Crews within a Flow:
```python
from crewai.flow.flow import Flow, listen, start, router, or_
from crewai import Crew, Agent, Task, Process
from pydantic import BaseModel
# Define structured state for precise control
class MarketState(BaseModel):
sentiment: str = "neutral"
confidence: float = 0.0
recommendations: list = []
class AdvancedAnalysisFlow(Flow[MarketState]):
@start()
def fetch_market_data(self):
# Demonstrate low-level control with structured state
self.state.sentiment = "analyzing"
return {"sector": "tech", "timeframe": "1W"} # These parameters match the task description template
@listen(fetch_market_data)
def analyze_with_crew(self, market_data):
# Show crew agency through specialized roles
analyst = Agent(
role="Senior Market Analyst",
goal="Conduct deep market analysis with expert insight",
backstory="You're a veteran analyst known for identifying subtle market patterns"
)
researcher = Agent(
role="Data Researcher",
goal="Gather and validate supporting market data",
backstory="You excel at finding and correlating multiple data sources"
)
analysis_task = Task(
description="Analyze {sector} sector data for the past {timeframe}",
expected_output="Detailed market analysis with confidence score",
agent=analyst
)
research_task = Task(
description="Find supporting data to validate the analysis",
expected_output="Corroborating evidence and potential contradictions",
agent=researcher
)
# Demonstrate crew autonomy
analysis_crew = Crew(
agents=[analyst, researcher],
tasks=[analysis_task, research_task],
process=Process.sequential,
verbose=True
)
return analysis_crew.kickoff(inputs=market_data) # Pass market_data as named inputs
@router(analyze_with_crew)
def determine_next_steps(self):
# Show flow control with conditional routing
if self.state.confidence > 0.8:
return "high_confidence"
elif self.state.confidence > 0.5:
return "medium_confidence"
return "low_confidence"
@listen("high_confidence")
def execute_strategy(self):
# Demonstrate complex decision making
strategy_crew = Crew(
agents=[
Agent(role="Strategy Expert",
goal="Develop optimal market strategy")
],
tasks=[
Task(description="Create detailed strategy based on analysis",
expected_output="Step-by-step action plan")
]
)
return strategy_crew.kickoff()
@listen(or_("medium_confidence", "low_confidence"))
def request_additional_analysis(self):
self.state.recommendations.append("Gather more data")
return "Additional analysis required"
```
This example demonstrates how to:
1. Use Python code for basic data operations
2. Create and execute Crews as steps in your workflow
3. Use Flow decorators to manage the sequence of operations
4. Implement conditional branching based on Crew results
## Connecting Your Crew to a Model
CrewAI supports using various LLMs through a variety of connection options. By default your agents will use the OpenAI API when querying the model. However, there are several other ways to allow your agents to connect to models. For example, you can configure your agents to use a local model via the Ollama tool.
Please refer to the [Connect CrewAI to LLMs](https://docs.crewai.com/how-to/LLM-Connections/) page for details on configuring your agents' connections to models.
## How CrewAI Compares
**CrewAI's Advantage**: CrewAI combines autonomous agent intelligence with precise workflow control through its unique Crews and Flows architecture. The framework excels at both high-level orchestration and low-level customization, enabling complex, production-grade systems with granular control.
- **LangGraph**: While LangGraph provides a foundation for building agent workflows, its approach requires significant boilerplate code and complex state management patterns. The framework's tight coupling with LangChain can limit flexibility when implementing custom agent behaviors or integrating with external systems.
*P.S. CrewAI demonstrates significant performance advantages over LangGraph, executing 5.76x faster in certain cases like this QA task example ([see comparison](https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks/CrewAI%20Flows%20%26%20Langgraph/QA%20Agent)) while achieving higher evaluation scores with faster completion times in certain coding tasks, like in this example ([detailed analysis](https://github.com/crewAIInc/crewAI-examples/blob/main/Notebooks/CrewAI%20Flows%20%26%20Langgraph/Coding%20Assistant/coding_assistant_eval.ipynb)).*
- **Autogen**: While Autogen excels at creating conversational agents capable of working together, it lacks an inherent concept of process. In Autogen, orchestrating agents' interactions requires additional programming, which can become complex and cumbersome as the scale of tasks grows.
- **ChatDev**: ChatDev introduced the idea of processes into the realm of AI agents, but its implementation is quite rigid. Customizations in ChatDev are limited and not geared towards production environments, which can hinder scalability and flexibility in real-world applications.
## Contribution
CrewAI is open-source and we welcome contributions. If you're looking to contribute, please:
- Fork the repository.
- Create a new branch for your feature.
- Add your feature or improvement.
- Send a pull request.
- We appreciate your input!
### Installing Dependencies
```bash
uv lock
uv sync
```
### Virtual Env
```bash
uv venv
```
### Pre-commit hooks
```bash
pre-commit install
```
### Running Tests
```bash
uv run pytest .
```
### Running static type checks
```bash
uvx mypy src
```
### Packaging
```bash
uv build
```
### Installing Locally
```bash
pip install dist/*.tar.gz
```
## Telemetry
CrewAI uses anonymous telemetry to collect usage data with the main purpose of helping us improve the library by focusing our efforts on the most used features, integrations and tools.
It's pivotal to understand that **NO data is collected** concerning prompts, task descriptions, agents' backstories or goals, usage of tools, API calls, responses, any data processed by the agents, or secrets and environment variables, with the exception of the conditions mentioned. When the `share_crew` feature is enabled, detailed data including task descriptions, agents' backstories or goals, and other specific attributes are collected to provide deeper insights while respecting user privacy. Users can disable telemetry by setting the environment variable OTEL_SDK_DISABLED to true.
Data collected includes:
- Version of CrewAI
- So we can understand how many users are using the latest version
- Version of Python
- So we can decide on what versions to better support
- General OS (e.g. number of CPUs, macOS/Windows/Linux)
- So we know what OS we should focus on and if we could build specific OS related features
- Number of agents and tasks in a crew
- So we make sure we are testing internally with similar use cases and educate people on the best practices
- Crew Process being used
- Understand where we should focus our efforts
- If Agents are using memory or allowing delegation
- Understand if we improved the features or maybe even drop them
- If Tasks are being executed in parallel or sequentially
- Understand if we should focus more on parallel execution
- Language model being used
- Improved support on most used languages
- Roles of agents in a crew
- Understand high level use cases so we can build better tools, integrations and examples about it
- Tools names available
- Understand out of the publicly available tools, which ones are being used the most so we can improve them
Users can opt-in to Further Telemetry, sharing the complete telemetry data by setting the `share_crew` attribute to `True` on their Crews. Enabling `share_crew` results in the collection of detailed crew and task execution data, including `goal`, `backstory`, `context`, and `output` of tasks. This enables a deeper insight into usage patterns while respecting the user's choice to share.
## License
CrewAI is released under the [MIT License](https://github.com/crewAIInc/crewAI/blob/main/LICENSE).
## Frequently Asked Questions (FAQ)
### General
- [What exactly is CrewAI?](#q-what-exactly-is-crewai)
- [How do I install CrewAI?](#q-how-do-i-install-crewai)
- [Does CrewAI depend on LangChain?](#q-does-crewai-depend-on-langchain)
- [Is CrewAI open-source?](#q-is-crewai-open-source)
- [Does CrewAI collect data from users?](#q-does-crewai-collect-data-from-users)
### Features and Capabilities
- [Can CrewAI handle complex use cases?](#q-can-crewai-handle-complex-use-cases)
- [Can I use CrewAI with local AI models?](#q-can-i-use-crewai-with-local-ai-models)
- [What makes Crews different from Flows?](#q-what-makes-crews-different-from-flows)
- [How is CrewAI better than LangChain?](#q-how-is-crewai-better-than-langchain)
- [Does CrewAI support fine-tuning or training custom models?](#q-does-crewai-support-fine-tuning-or-training-custom-models)
### Resources and Community
- [Where can I find real-world CrewAI examples?](#q-where-can-i-find-real-world-crewai-examples)
- [How can I contribute to CrewAI?](#q-how-can-i-contribute-to-crewai)
### Enterprise Features
- [What additional features does CrewAI Enterprise offer?](#q-what-additional-features-does-crewai-enterprise-offer)
- [Is CrewAI Enterprise available for cloud and on-premise deployments?](#q-is-crewai-enterprise-available-for-cloud-and-on-premise-deployments)
- [Can I try CrewAI Enterprise for free?](#q-can-i-try-crewai-enterprise-for-free)
### Q: What exactly is CrewAI?
A: CrewAI is a standalone, lean, and fast Python framework built specifically for orchestrating autonomous AI agents. Unlike frameworks like LangChain, CrewAI does not rely on external dependencies, making it leaner, faster, and simpler.
### Q: How do I install CrewAI?
A: Install CrewAI using pip:
```shell
pip install crewai
```
For additional tools, use:
```shell
pip install 'crewai[tools]'
```
### Q: Does CrewAI depend on LangChain?
A: No. CrewAI is built entirely from the ground up, with no dependencies on LangChain or other agent frameworks. This ensures a lean, fast, and flexible experience.
### Q: Can CrewAI handle complex use cases?
A: Yes. CrewAI excels at both simple and highly complex real-world scenarios, offering deep customization options at both high and low levels, from internal prompts to sophisticated workflow orchestration.
### Q: Can I use CrewAI with local AI models?
A: Absolutely! CrewAI supports various language models, including local ones. Tools like Ollama and LM Studio allow seamless integration. Check the [LLM Connections documentation](https://docs.crewai.com/how-to/LLM-Connections/) for more details.
### Q: What makes Crews different from Flows?
A: Crews provide autonomous agent collaboration, ideal for tasks requiring flexible decision-making and dynamic interaction. Flows offer precise, event-driven control, ideal for managing detailed execution paths and secure state management. You can seamlessly combine both for maximum effectiveness.
### Q: How is CrewAI better than LangChain?
A: CrewAI provides simpler, more intuitive APIs, faster execution speeds, more reliable and consistent results, robust documentation, and an active community—addressing common criticisms and limitations associated with LangChain.
### Q: Is CrewAI open-source?
A: Yes, CrewAI is open-source and actively encourages community contributions and collaboration.
### Q: Does CrewAI collect data from users?
A: CrewAI collects anonymous telemetry data strictly for improvement purposes. Sensitive data such as prompts, tasks, or API responses are never collected unless explicitly enabled by the user.
### Q: Where can I find real-world CrewAI examples?
A: Check out practical examples in the [CrewAI-examples repository](https://github.com/crewAIInc/crewAI-examples), covering use cases like trip planners, stock analysis, and job postings.
### Q: How can I contribute to CrewAI?
A: Contributions are warmly welcomed! Fork the repository, create your branch, implement your changes, and submit a pull request. See the Contribution section of the README for detailed guidelines.
### Q: What additional features does CrewAI Enterprise offer?
A: CrewAI Enterprise provides advanced features such as a unified control plane, real-time observability, secure integrations, advanced security, actionable insights, and dedicated 24/7 enterprise support.
### Q: Is CrewAI Enterprise available for cloud and on-premise deployments?
A: Yes, CrewAI Enterprise supports both cloud-based and on-premise deployment options, allowing enterprises to meet their specific security and compliance requirements.
### Q: Can I try CrewAI Enterprise for free?
A: Yes, you can explore part of the CrewAI Enterprise Suite by accessing the [Crew Control Plane](https://app.crewai.com) for free.
### Q: Does CrewAI support fine-tuning or training custom models?
A: Yes, CrewAI can integrate with custom-trained or fine-tuned models, allowing you to enhance your agents with domain-specific knowledge and accuracy.
### Q: Can CrewAI agents interact with external tools and APIs?
A: Absolutely! CrewAI agents can easily integrate with external tools, APIs, and databases, empowering them to leverage real-world data and resources.
### Q: Is CrewAI suitable for production environments?
A: Yes, CrewAI is explicitly designed with production-grade standards, ensuring reliability, stability, and scalability for enterprise deployments.
### Q: How scalable is CrewAI?
A: CrewAI is highly scalable, supporting simple automations and large-scale enterprise workflows involving numerous agents and complex tasks simultaneously.
### Q: Does CrewAI offer debugging and monitoring tools?
A: Yes, CrewAI Enterprise includes advanced debugging, tracing, and real-time observability features, simplifying the management and troubleshooting of your automations.
### Q: What programming languages does CrewAI support?
A: CrewAI is primarily Python-based but easily integrates with services and APIs written in any programming language through its flexible API integration capabilities.
### Q: Does CrewAI offer educational resources for beginners?
A: Yes, CrewAI provides extensive beginner-friendly tutorials, courses, and documentation through learn.crewai.com, supporting developers at all skill levels.
### Q: Can CrewAI automate human-in-the-loop workflows?
A: Yes, CrewAI fully supports human-in-the-loop workflows, allowing seamless collaboration between human experts and AI agents for enhanced decision-making.

View File

@@ -1,152 +0,0 @@
---
title: RAG Tool
description: The `RagTool` is a dynamic knowledge base tool for answering questions using Retrieval-Augmented Generation.
icon: vector-square
mode: "wide"
---
# `RagTool`
## Description
The `RagTool` is designed to answer questions by leveraging the power of Retrieval-Augmented Generation (RAG) through CrewAI's native RAG system.
It provides a dynamic knowledge base that can be queried to retrieve relevant information from various data sources.
This tool is particularly useful for applications that require access to a vast array of information and need to provide contextually relevant answers.
## Example
The following example demonstrates how to initialize the tool and use it with different data sources:
```python Code
from crewai_tools import RagTool
# Create a RAG tool with default settings
rag_tool = RagTool()
# Add content from a file
rag_tool.add(data_type="file", path="path/to/your/document.pdf")
# Add content from a web page
rag_tool.add(data_type="web_page", url="https://example.com")
# Define an agent with the RagTool
@agent
def knowledge_expert(self) -> Agent:
'''
This agent uses the RagTool to answer questions about the knowledge base.
'''
return Agent(
config=self.agents_config["knowledge_expert"],
allow_delegation=False,
tools=[rag_tool]
)
```
## Supported Data Sources
The `RagTool` can be used with a wide variety of data sources, including:
- 📰 PDF files
- 📊 CSV files
- 📃 JSON files
- 📝 Text
- 📁 Directories/Folders
- 🌐 HTML Web pages
- 📽️ YouTube Channels
- 📺 YouTube Videos
- 📚 Documentation websites
- 📝 MDX files
- 📄 DOCX files
- 🧾 XML files
- 📬 Gmail
- 📝 GitHub repositories
- 🐘 PostgreSQL databases
- 🐬 MySQL databases
- 🤖 Slack conversations
- 💬 Discord messages
- 🗨️ Discourse forums
- 📝 Substack newsletters
- 🐝 Beehiiv content
- 💾 Dropbox files
- 🖼️ Images
- ⚙️ Custom data sources
## Parameters
The `RagTool` accepts the following parameters:
- **summarize**: Optional. Whether to summarize the retrieved content. Default is `False`.
- **adapter**: Optional. A custom adapter for the knowledge base. If not provided, a CrewAIRagAdapter will be used.
- **config**: Optional. Configuration for the underlying CrewAI RAG system.
## Adding Content
You can add content to the knowledge base using the `add` method:
```python Code
# Add a PDF file
rag_tool.add(data_type="file", path="path/to/your/document.pdf")
# Add a web page
rag_tool.add(data_type="web_page", url="https://example.com")
# Add a YouTube video
rag_tool.add(data_type="youtube_video", url="https://www.youtube.com/watch?v=VIDEO_ID")
# Add a directory of files
rag_tool.add(data_type="directory", path="path/to/your/directory")
```
## Agent Integration Example
Here's how to integrate the `RagTool` with a CrewAI agent:
```python Code
from crewai import Agent
from crewai.project import agent
from crewai_tools import RagTool
# Initialize the tool and add content
rag_tool = RagTool()
rag_tool.add(data_type="web_page", url="https://docs.crewai.com")
rag_tool.add(data_type="file", path="company_data.pdf")
# Define an agent with the RagTool
@agent
def knowledge_expert(self) -> Agent:
return Agent(
config=self.agents_config["knowledge_expert"],
allow_delegation=False,
tools=[rag_tool]
)
```
## Advanced Configuration
You can customize the behavior of the `RagTool` by providing a configuration dictionary:
```python Code
from crewai_tools import RagTool
# Create a RAG tool with custom configuration
config = {
"vectordb": {
"provider": "qdrant",
"config": {
"collection_name": "my-collection"
}
},
"embedding_model": {
"provider": "openai",
"config": {
"model": "text-embedding-3-small"
}
}
}
rag_tool = RagTool(config=config, summarize=True)
```
## Conclusion
The `RagTool` provides a powerful way to create and query knowledge bases from various data sources. By leveraging Retrieval-Augmented Generation, it enables agents to access and retrieve relevant information efficiently, enhancing their ability to provide accurate and contextually appropriate responses.

View File

@@ -1,131 +0,0 @@
[project]
name = "crewai-core"
dynamic = ["version"]
description = "Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks."
readme = "README.md"
requires-python = ">=3.10,<3.14"
authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
dependencies = [
# Core Dependencies
"pydantic>=2.6.1",
"openai>=1.13.3",
"litellm==1.74.9",
"instructor>=1.3.3",
# Text Processing
"pdfplumber>=0.11.4",
"regex>=2024.9.11",
# Telemetry and Monitoring
"opentelemetry-api>=1.30.0",
"opentelemetry-sdk>=1.30.0",
"opentelemetry-exporter-otlp-proto-http>=1.30.0",
# Data Handling
"chromadb>=0.5.23",
"tokenizers>=0.20.3",
"onnxruntime==1.22.0",
"openpyxl>=3.1.5",
"pyvis>=0.3.2",
# Authentication and Security
"python-dotenv>=1.0.0",
"pyjwt>=2.9.0",
# Configuration and Utils
"click>=8.1.8",
"appdirs>=1.4.4",
"jsonref>=1.1.0",
"json-repair==0.25.2",
"uv>=0.4.25",
"tomli-w>=1.1.0",
"tomli>=2.0.2",
"blinker>=1.9.0",
"json5>=0.10.0",
"portalocker==2.7.0",
]
[project.optional-dependencies]
tools = ["crewai-tools"]
embeddings = [
"tiktoken~=0.8.0"
]
pdfplumber = [
"pdfplumber>=0.11.4",
]
pandas = [
"pandas>=2.2.3",
]
openpyxl = [
"openpyxl>=3.1.5",
]
mem0 = ["mem0ai>=0.1.94"]
docling = [
"docling>=2.12.0",
]
aisuite = [
"aisuite>=0.1.10",
]
qdrant = [
"qdrant-client[fastembed]>=1.14.3",
]
[project.scripts]
crewai = "crewai.cli.cli:crewai"
[tool.ruff]
exclude = [
"src/crewai/cli/templates",
]
fix = true
[tool.ruff.lint]
select = [
"E", # pycodestyle errors (style issues)
"F", # Pyflakes (code errors)
"B", # flake8-bugbear (bug prevention)
"S", # bandit (security issues)
"RUF", # ruff-specific rules
"N", # pep8-naming (naming conventions)
"W", # pycodestyle warnings
"PERF", # performance issues
"PIE", # flake8-pie (unnecessary code)
"ASYNC", # async/await best practices
"RET", # flake8-return (return improvements)
"UP006", # use collections.abc
"UP007", # use X | Y for unions
"UP035", # use dict/list instead of typing.Dict/List
"UP037", # remove quotes from type annotations
"UP045", # use X | None instead of Optional[X]
"UP004", # use isinstance instead of type
"UP008", # use super() instead of super(Class, self)
"UP010", # use isinstance for type checks
"UP018", # use str() instead of "string"
"UP031", # use f-strings for .format()
"UP032", # use f-strings for .format() with positional
"I001", # sort imports
"I002", # remove unused imports
]
ignore = ["E501"] # ignore line too long globally
[tool.ruff.lint.per-file-ignores]
"tests/**/*.py" = ["S101", "RET504"] # Allow assert statements and unnecessary assignments before return in tests
[tool.mypy]
exclude = ["src/crewai/cli/templates", "tests/"]
[tool.bandit]
exclude_dirs = ["src/crewai/cli/templates"]
[tool.pytest.ini_options]
testpaths = ["tests"]
markers = [
"telemetry: mark test as a telemetry test (don't mock telemetry)",
]
[tool.hatch.version]
path = "src/crewai/__init__.py"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/crewai"]

View File

@@ -1,243 +0,0 @@
import os
from typing import Any, cast
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
class EmbeddingConfigurator:
def __init__(self):
self.embedding_functions = {
"openai": self._configure_openai,
"azure": self._configure_azure,
"ollama": self._configure_ollama,
"vertexai": self._configure_vertexai,
"google": self._configure_google,
"cohere": self._configure_cohere,
"voyageai": self._configure_voyageai,
"bedrock": self._configure_bedrock,
"huggingface": self._configure_huggingface,
"watson": self._configure_watson,
"custom": self._configure_custom,
}
def configure_embedder(
self,
embedder_config: dict[str, Any] | None = None,
) -> EmbeddingFunction:
"""Configures and returns an embedding function based on the provided config."""
if embedder_config is None:
return self._create_default_embedding_function()
provider = embedder_config.get("provider")
config = embedder_config.get("config", {})
model_name = config.get("model") if provider != "custom" else None
if provider not in self.embedding_functions:
raise Exception(
f"Unsupported embedding provider: {provider}, supported providers: {list(self.embedding_functions.keys())}"
)
try:
embedding_function = self.embedding_functions[provider]
except ImportError as e:
missing_package = str(e).split()[-1]
raise ImportError(
f"{missing_package} is not installed. Please install it with: pip install {missing_package}"
) from e
return (
embedding_function(config)
if provider == "custom"
else embedding_function(config, model_name)
)
@staticmethod
def _create_default_embedding_function():
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
@staticmethod
def _configure_openai(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
@staticmethod
def _configure_azure(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key"),
api_base=config.get("api_base"),
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
@staticmethod
def _configure_ollama(config, model_name):
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
@staticmethod
def _configure_vertexai(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleVertexEmbeddingFunction,
)
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
@staticmethod
def _configure_google(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
)
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
@staticmethod
def _configure_cohere(config, model_name):
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
@staticmethod
def _configure_voyageai(config, model_name):
from chromadb.utils.embedding_functions.voyageai_embedding_function import (
VoyageAIEmbeddingFunction,
)
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
@staticmethod
def _configure_bedrock(config, model_name):
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
@staticmethod
def _configure_huggingface(config, model_name):
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
@staticmethod
def _configure_watson(config, model_name):
try:
import ibm_watsonx_ai.foundation_models as watson_models
from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames as EmbedParams
except ImportError as e:
raise ImportError(
"IBM Watson dependencies are not installed. Please install them to use Watson embedding."
) from e
class WatsonEmbeddingFunction(EmbeddingFunction):
def __call__(self, input: Documents) -> Embeddings:
if isinstance(input, str):
input = [input]
embed_params = {
EmbedParams.TRUNCATE_INPUT_TOKENS: 3,
EmbedParams.RETURN_OPTIONS: {"input_text": True},
}
embedding = watson_models.Embeddings(
model_id=config.get("model"),
params=embed_params,
credentials=Credentials(
api_key=config.get("api_key"), url=config.get("api_url")
),
project_id=config.get("project_id"),
)
try:
embeddings = embedding.embed_documents(input)
return cast(Embeddings, embeddings)
except Exception as e:
print("Error during Watson embedding:", e)
raise e
return WatsonEmbeddingFunction()
@staticmethod
def _configure_custom(config):
custom_embedder = config.get("embedder")
if isinstance(custom_embedder, EmbeddingFunction):
try:
validate_embedding_function(custom_embedder)
return custom_embedder
except Exception as e:
raise ValueError(f"Invalid custom embedding function: {e!s}") from e
elif callable(custom_embedder):
try:
instance = custom_embedder()
if isinstance(instance, EmbeddingFunction):
validate_embedding_function(instance)
return instance
raise ValueError(
"Custom embedder does not create an EmbeddingFunction instance"
)
except Exception as e:
raise ValueError(f"Error instantiating custom embedder: {e!s}") from e
else:
raise ValueError(
"Custom embedder must be an instance of `EmbeddingFunction` or a callable that creates one"
)

View File

@@ -1,148 +0,0 @@
"""Minimal embedding function factory for CrewAI."""
import os
from chromadb import EmbeddingFunction
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
GooglePalmEmbeddingFunction,
GoogleVertexEmbeddingFunction,
)
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingFunction,
)
from chromadb.utils.embedding_functions.instructor_embedding_function import (
InstructorEmbeddingFunction,
)
from chromadb.utils.embedding_functions.jina_embedding_function import (
JinaEmbeddingFunction,
)
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
from chromadb.utils.embedding_functions.onnx_mini_lm_l6_v2 import ONNXMiniLM_L6_V2
from chromadb.utils.embedding_functions.open_clip_embedding_function import (
OpenCLIPEmbeddingFunction,
)
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
from chromadb.utils.embedding_functions.roboflow_embedding_function import (
RoboflowEmbeddingFunction,
)
from chromadb.utils.embedding_functions.sentence_transformer_embedding_function import (
SentenceTransformerEmbeddingFunction,
)
from chromadb.utils.embedding_functions.text2vec_embedding_function import (
Text2VecEmbeddingFunction,
)
from crewai.rag.embeddings.types import EmbeddingOptions
def get_embedding_function(
config: EmbeddingOptions | dict | None = None,
) -> EmbeddingFunction:
"""Get embedding function - delegates to ChromaDB.
Args:
config: Optional configuration - either an EmbeddingOptions object or a dict with:
- provider: The embedding provider to use (default: "openai")
- Any other provider-specific parameters
Returns:
EmbeddingFunction instance ready for use with ChromaDB
Supported providers:
- openai: OpenAI embeddings
- cohere: Cohere embeddings
- ollama: Ollama local embeddings
- huggingface: HuggingFace embeddings
- sentence-transformer: Local sentence transformers
- instructor: Instructor embeddings for specialized tasks
- google-palm: Google PaLM embeddings
- google-generativeai: Google Generative AI embeddings
- google-vertex: Google Vertex AI embeddings
- amazon-bedrock: AWS Bedrock embeddings
- jina: Jina AI embeddings
- roboflow: Roboflow embeddings for vision tasks
- openclip: OpenCLIP embeddings for multimodal tasks
- text2vec: Text2Vec embeddings
- onnx: ONNX MiniLM-L6-v2 (no API key needed, included with ChromaDB)
Examples:
# Use default OpenAI embedding
>>> embedder = get_embedding_function()
# Use Cohere with dict
>>> embedder = get_embedding_function({
... "provider": "cohere",
... "api_key": "your-key",
... "model_name": "embed-english-v3.0"
... })
# Use with EmbeddingOptions
>>> embedder = get_embedding_function(
... EmbeddingOptions(provider="sentence-transformer", model_name="all-MiniLM-L6-v2")
... )
# Use local sentence transformers (no API key needed)
>>> embedder = get_embedding_function({
... "provider": "sentence-transformer",
... "model_name": "all-MiniLM-L6-v2"
... })
# Use Ollama for local embeddings
>>> embedder = get_embedding_function({
... "provider": "ollama",
... "model_name": "nomic-embed-text"
... })
# Use ONNX (no API key needed)
>>> embedder = get_embedding_function({
... "provider": "onnx"
... })
"""
if config is None:
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
# Handle EmbeddingOptions object
if isinstance(config, EmbeddingOptions):
config_dict = config.model_dump(exclude_none=True)
else:
config_dict = config.copy()
provider = config_dict.pop("provider", "openai")
embedding_functions = {
"openai": OpenAIEmbeddingFunction,
"cohere": CohereEmbeddingFunction,
"ollama": OllamaEmbeddingFunction,
"huggingface": HuggingFaceEmbeddingFunction,
"sentence-transformer": SentenceTransformerEmbeddingFunction,
"instructor": InstructorEmbeddingFunction,
"google-palm": GooglePalmEmbeddingFunction,
"google-generativeai": GoogleGenerativeAiEmbeddingFunction,
"google-vertex": GoogleVertexEmbeddingFunction,
"amazon-bedrock": AmazonBedrockEmbeddingFunction,
"jina": JinaEmbeddingFunction,
"roboflow": RoboflowEmbeddingFunction,
"openclip": OpenCLIPEmbeddingFunction,
"text2vec": Text2VecEmbeddingFunction,
"onnx": ONNXMiniLM_L6_V2,
}
if provider not in embedding_functions:
raise ValueError(
f"Unsupported provider: {provider}. "
f"Available providers: {list(embedding_functions.keys())}"
)
return embedding_functions[provider](**config_dict)

View File

@@ -1,62 +0,0 @@
"""Type definitions for the embeddings module."""
from typing import Literal
from pydantic import BaseModel, Field, SecretStr
from crewai.rag.types import EmbeddingFunction
EmbeddingProvider = Literal[
"openai",
"cohere",
"ollama",
"huggingface",
"sentence-transformer",
"instructor",
"google-palm",
"google-generativeai",
"google-vertex",
"amazon-bedrock",
"jina",
"roboflow",
"openclip",
"text2vec",
"onnx",
]
"""Supported embedding providers.
These correspond to the embedding functions available in ChromaDB's
embedding_functions module. Each provider has specific requirements
and configuration options.
"""
class EmbeddingOptions(BaseModel):
"""Configuration options for embedding providers.
Generic attributes that can be passed to get_embedding_function
to configure various embedding providers.
"""
provider: EmbeddingProvider = Field(
..., description="Embedding provider name (e.g., 'openai', 'cohere', 'onnx')"
)
model_name: str | None = Field(
default=None, description="Model name for the embedding provider"
)
api_key: SecretStr | None = Field(
default=None, description="API key for the embedding provider"
)
class EmbeddingConfig(BaseModel):
"""Configuration wrapper for embedding functions.
Accepts either a pre-configured EmbeddingFunction or EmbeddingOptions
to create one. This provides flexibility in how embeddings are configured.
Attributes:
function: Either a callable EmbeddingFunction or EmbeddingOptions to create one
"""
function: EmbeddingFunction | EmbeddingOptions

View File

@@ -1,26 +0,0 @@
from .converter import Converter, ConverterError
from .exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from .file_handler import FileHandler
from .i18n import I18N
from .internal_instructor import InternalInstructor
from .logger import Logger
from .parser import YamlParser
from .printer import Printer
from .prompts import Prompts
from .rpm_controller import RPMController
__all__ = [
"I18N",
"Converter",
"ConverterError",
"FileHandler",
"InternalInstructor",
"LLMContextLengthExceededException",
"Logger",
"Printer",
"Prompts",
"RPMController",
"YamlParser",
]

View File

@@ -1,19 +0,0 @@
TRAINING_DATA_FILE = "training_data.pkl"
TRAINED_AGENTS_DATA_FILE = "trained_agents_data.pkl"
DEFAULT_SCORE_THRESHOLD = 0.35
KNOWLEDGE_DIRECTORY = "knowledge"
MAX_LLM_RETRY = 3
MAX_FILE_NAME_LENGTH = 255
EMITTER_COLOR = "bold_blue"
class _NotSpecified:
def __repr__(self):
return "NOT_SPECIFIED"
# Sentinel value used to detect when no value has been explicitly provided.
# Unlike `None`, which might be a valid value from the user, `NOT_SPECIFIED` allows
# us to distinguish between "not passed at all" and "explicitly passed None" or "[]".
NOT_SPECIFIED = _NotSpecified()
CREWAI_BASE_URL = "https://app.crewai.com"

View File

@@ -1,48 +0,0 @@
import json
from typing import Any
import regex
from pydantic import BaseModel, ValidationError
from crewai.agents.parser import OutputParserException
"""Parser for converting text outputs into Pydantic models."""
class CrewPydanticOutputParser:
"""Parses text outputs into specified Pydantic models."""
pydantic_object: type[BaseModel]
def parse_result(self, result: str) -> Any:
result = self._transform_in_valid_json(result)
# Treating edge case of function calling llm returning the name instead of tool_name
json_object = json.loads(result)
if "tool_name" not in json_object:
json_object["tool_name"] = json_object.get("name", "")
result = json.dumps(json_object)
try:
return self.pydantic_object.model_validate(json_object)
except ValidationError as e:
name = self.pydantic_object.__name__
msg = f"Failed to parse {name} from completion {json_object}. Got: {e}"
raise OutputParserException(error=msg)
def _transform_in_valid_json(self, text) -> str:
text = text.replace("```", "").replace("json", "")
json_pattern = r"\{(?:[^{}]|(?R))*\}"
matches = regex.finditer(json_pattern, text)
for match in matches:
try:
# Attempt to parse the matched string as JSON
json_obj = json.loads(match.group())
# Return the first successfully parsed JSON object
json_obj = json.dumps(json_obj)
return str(json_obj)
except json.JSONDecodeError:
# If parsing fails, skip to the next match
continue
return text

View File

@@ -1,30 +0,0 @@
from typing import ClassVar
class LLMContextLengthExceededException(Exception):
CONTEXT_LIMIT_ERRORS: ClassVar[list[str]] = [
"expected a string with maximum length",
"maximum context length",
"context length exceeded",
"context_length_exceeded",
"context window full",
"too many tokens",
"input is too long",
"exceeds token limit",
]
def __init__(self, error_message: str):
self.original_error_message = error_message
super().__init__(self._get_error_message(error_message))
def _is_context_limit_error(self, error_message: str) -> bool:
return any(
phrase.lower() in error_message.lower()
for phrase in self.CONTEXT_LIMIT_ERRORS
)
def _get_error_message(self, error_message: str):
return (
f"LLM context length exceeded. Original error: {error_message}\n"
"Consider using a smaller input or implementing a text splitting strategy."
)

View File

@@ -1,118 +0,0 @@
import json
import os
import pickle
from datetime import datetime
class FileHandler:
"""Handler for file operations supporting both JSON and text-based logging.
Args:
file_path (Union[bool, str]): Path to the log file or boolean flag
"""
def __init__(self, file_path: bool | str):
self._initialize_path(file_path)
def _initialize_path(self, file_path: bool | str):
if file_path is True: # File path is boolean True
self._path = os.path.join(os.curdir, "logs.txt")
elif isinstance(file_path, str): # File path is a string
if file_path.endswith((".json", ".txt")):
self._path = (
file_path # No modification if the file ends with .json or .txt
)
else:
self._path = (
file_path + ".txt"
) # Append .txt if the file doesn't end with .json or .txt
else:
raise ValueError(
"file_path must be a string or boolean."
) # Handle the case where file_path isn't valid
def log(self, **kwargs):
try:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = {"timestamp": now, **kwargs}
if self._path.endswith(".json"):
# Append log in JSON format
with open(self._path, "a", encoding="utf-8") as file:
# If the file is empty, start with a list; else, append to it
try:
# Try reading existing content to avoid overwriting
with open(self._path, "r", encoding="utf-8") as read_file:
existing_data = json.load(read_file)
existing_data.append(log_entry)
except (json.JSONDecodeError, FileNotFoundError):
# If no valid JSON or file doesn't exist, start with an empty list
existing_data = [log_entry]
with open(self._path, "w", encoding="utf-8") as write_file:
json.dump(existing_data, write_file, indent=4)
write_file.write("\n")
else:
# Append log in plain text format
message = (
f"{now}: "
+ ", ".join([f'{key}="{value}"' for key, value in kwargs.items()])
+ "\n"
)
with open(self._path, "a", encoding="utf-8") as file:
file.write(message)
except Exception as e:
raise ValueError(f"Failed to log message: {e!s}")
class PickleHandler:
def __init__(self, file_name: str) -> None:
"""
Initialize the PickleHandler with the name of the file where data will be stored.
The file will be saved in the current directory.
Parameters:
- file_name (str): The name of the file for saving and loading data.
"""
if not file_name.endswith(".pkl"):
file_name += ".pkl"
self.file_path = os.path.join(os.getcwd(), file_name)
def initialize_file(self) -> None:
"""
Initialize the file with an empty dictionary and overwrite any existing data.
"""
self.save({})
def save(self, data) -> None:
"""
Save the data to the specified file using pickle.
Parameters:
- data (object): The data to be saved.
"""
with open(self.file_path, "wb") as file:
pickle.dump(data, file)
def load(self) -> dict:
"""
Load the data from the specified file using pickle.
Returns:
- dict: The data loaded from the file.
"""
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
return {} # Return an empty dictionary if the file does not exist or is empty
with open(self.file_path, "rb") as file:
try:
return pickle.load(file) # nosec
except EOFError:
return {} # Return an empty dictionary if the file is empty or corrupted
except Exception:
raise # Raise any other exceptions that occur during loading

View File

@@ -1,27 +0,0 @@
from typing import TYPE_CHECKING
from crewai.utilities.constants import _NotSpecified
if TYPE_CHECKING:
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
def aggregate_raw_outputs_from_task_outputs(task_outputs: list["TaskOutput"]) -> str:
"""Generate string context from the task outputs."""
dividers = "\n\n----------\n\n"
# Join task outputs with dividers
return dividers.join(output.raw for output in task_outputs)
def aggregate_raw_outputs_from_tasks(tasks: list["Task"] | _NotSpecified) -> str:
"""Generate string context from the tasks."""
task_outputs = (
[task.output for task in tasks if task.output is not None]
if isinstance(tasks, list)
else []
)
return aggregate_raw_outputs_from_task_outputs(task_outputs)

View File

@@ -1,54 +0,0 @@
import json
import os
from pydantic import BaseModel, Field, PrivateAttr, model_validator
"""Internationalization support for CrewAI prompts and messages."""
class I18N(BaseModel):
"""Handles loading and retrieving internationalized prompts."""
_prompts: dict[str, dict[str, str]] = PrivateAttr()
prompt_file: str | None = Field(
default=None,
description="Path to the prompt_file file to load",
)
@model_validator(mode="after")
def load_prompts(self) -> "I18N":
"""Load prompts from a JSON file."""
try:
if self.prompt_file:
with open(self.prompt_file, "r", encoding="utf-8") as f:
self._prompts = json.load(f)
else:
dir_path = os.path.dirname(os.path.realpath(__file__))
prompts_path = os.path.join(dir_path, "../translations/en.json")
with open(prompts_path, "r", encoding="utf-8") as f:
self._prompts = json.load(f)
except FileNotFoundError:
raise Exception(f"Prompt file '{self.prompt_file}' not found.")
except json.JSONDecodeError:
raise Exception("Error decoding JSON from the prompts file.")
if not self._prompts:
self._prompts = {}
return self
def slice(self, slice: str) -> str:
return self.retrieve("slices", slice)
def errors(self, error: str) -> str:
return self.retrieve("errors", error)
def tools(self, tool: str) -> str | dict[str, str]:
return self.retrieve("tools", tool)
def retrieve(self, kind, key) -> str:
try:
return self._prompts[kind][key]
except Exception as _:
raise Exception(f"Prompt for '{kind}':'{key}' not found.")

View File

@@ -1,30 +0,0 @@
"""Import utilities for optional dependencies."""
import importlib
from types import ModuleType
class OptionalDependencyError(ImportError):
"""Exception raised when an optional dependency is not installed."""
def require(name: str, *, purpose: str) -> ModuleType:
"""Import a module, raising a helpful error if it's not installed.
Args:
name: The module name to import.
purpose: Description of what requires this dependency.
Returns:
The imported module.
Raises:
OptionalDependencyError: If the module is not installed.
"""
try:
return importlib.import_module(name)
except ImportError as exc:
raise OptionalDependencyError(
f"{purpose} requires the optional dependency '{name}'.\n"
f"Install it with: uv add {name}"
) from exc

View File

@@ -1,42 +0,0 @@
import warnings
from typing import Any
class InternalInstructor:
"""Class that wraps an agent llm with instructor."""
def __init__(
self,
content: str,
model: type,
agent: Any | None = None,
llm: str | None = None,
):
self.content = content
self.agent = agent
self.llm = llm
self.model = model
self._client = None
self.set_instructor()
def set_instructor(self):
"""Set instructor."""
if self.agent and not self.llm:
self.llm = self.agent.function_calling_llm or self.agent.llm
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
import instructor
from litellm import completion
self._client = instructor.from_litellm(completion)
def to_json(self):
model = self.to_pydantic()
return model.model_dump_json(indent=2)
def to_pydantic(self):
messages = [{"role": "user", "content": self.content}]
return self._client.chat.completions.create(
model=self.llm.model, response_model=self.model, messages=messages
)

View File

@@ -1,20 +0,0 @@
from datetime import datetime
from pydantic import BaseModel, Field, PrivateAttr
from crewai.utilities.printer import Printer
class Logger(BaseModel):
verbose: bool = Field(default=False)
_printer: Printer = PrivateAttr(default_factory=Printer)
default_color: str = Field(default="bold_yellow")
def log(self, level, message, color=None):
if color is None:
color = self.default_color
if self.verbose:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self._printer.print(
f"\n[{timestamp}][{level.upper()}]: {message}", color=color
)

View File

@@ -1,31 +0,0 @@
import re
class YamlParser:
@staticmethod
def parse(file):
"""
Parses a YAML file, modifies specific patterns, and checks for unsupported 'context' usage.
Args:
file (file object): The YAML file to parse.
Returns:
str: The modified content of the YAML file.
Raises:
ValueError: If 'context:' is used incorrectly.
"""
content = file.read()
# Replace single { and } with doubled ones, while leaving already doubled ones intact and the other special characters {# and {%
modified_content = re.sub(r"(?<!\{){(?!\{)(?!\#)(?!\%)", "{{", content)
modified_content = re.sub(
r"(?<!\})(?<!\%)(?<!\#)\}(?!})", "}}", modified_content
)
# Check for 'context:' not followed by '[' and raise an error
if re.search(r"context:(?!\s*\[)", modified_content):
raise ValueError(
"Context is currently only supported in code when creating a task. "
"Please use the 'context' key in the task configuration."
)
return modified_content

View File

@@ -1,69 +0,0 @@
"""Utility for colored console output."""
class Printer:
"""Handles colored console output formatting."""
def print(self, content: str, color: str | None = None):
if color == "purple":
self._print_purple(content)
elif color == "red":
self._print_red(content)
elif color == "bold_green":
self._print_bold_green(content)
elif color == "bold_purple":
self._print_bold_purple(content)
elif color == "bold_blue":
self._print_bold_blue(content)
elif color == "yellow":
self._print_yellow(content)
elif color == "bold_yellow":
self._print_bold_yellow(content)
elif color == "cyan":
self._print_cyan(content)
elif color == "bold_cyan":
self._print_bold_cyan(content)
elif color == "magenta":
self._print_magenta(content)
elif color == "bold_magenta":
self._print_bold_magenta(content)
elif color == "green":
self._print_green(content)
else:
print(content)
def _print_bold_purple(self, content):
print(f"\033[1m\033[95m {content}\033[00m")
def _print_bold_green(self, content):
print(f"\033[1m\033[92m {content}\033[00m")
def _print_purple(self, content):
print(f"\033[95m {content}\033[00m")
def _print_red(self, content):
print(f"\033[91m {content}\033[00m")
def _print_bold_blue(self, content):
print(f"\033[1m\033[94m {content}\033[00m")
def _print_yellow(self, content):
print(f"\033[93m {content}\033[00m")
def _print_bold_yellow(self, content):
print(f"\033[1m\033[93m {content}\033[00m")
def _print_cyan(self, content):
print(f"\033[96m {content}\033[00m")
def _print_bold_cyan(self, content):
print(f"\033[1m\033[96m {content}\033[00m")
def _print_magenta(self, content):
print(f"\033[35m {content}\033[00m")
def _print_bold_magenta(self, content):
print(f"\033[1m\033[35m {content}\033[00m")
def _print_green(self, content):
print(f"\033[32m {content}\033[00m")

View File

@@ -1,82 +0,0 @@
from typing import Any
from pydantic import BaseModel, Field
from crewai.utilities import I18N
class Prompts(BaseModel):
"""Manages and generates prompts for a generic agent."""
i18n: I18N = Field(default=I18N())
has_tools: bool = False
system_template: str | None = None
prompt_template: str | None = None
response_template: str | None = None
use_system_prompt: bool | None = False
agent: Any
def task_execution(self) -> dict[str, str]:
"""Generate a standard prompt for task execution."""
slices = ["role_playing"]
if self.has_tools:
slices.append("tools")
else:
slices.append("no_tools")
system = self._build_prompt(slices)
slices.append("task")
if (
not self.system_template
and not self.prompt_template
and self.use_system_prompt
):
return {
"system": system,
"user": self._build_prompt(["task"]),
"prompt": self._build_prompt(slices),
}
return {
"prompt": self._build_prompt(
slices,
self.system_template,
self.prompt_template,
self.response_template,
)
}
def _build_prompt(
self,
components: list[str],
system_template=None,
prompt_template=None,
response_template=None,
) -> str:
"""Constructs a prompt string from specified components."""
if not system_template or not prompt_template:
# If any of the required templates are missing, fall back to the default format
prompt_parts = [self.i18n.slice(component) for component in components]
prompt = "".join(prompt_parts)
else:
# All templates are provided, use them
prompt_parts = [
self.i18n.slice(component)
for component in components
if component != "task"
]
system = system_template.replace("{{ .System }}", "".join(prompt_parts))
prompt = prompt_template.replace(
"{{ .Prompt }}", "".join(self.i18n.slice("task"))
)
# Handle missing response_template
if response_template:
response = response_template.split("{{ .Response }}")[0]
prompt = f"{system}\n{prompt}\n{response}"
else:
prompt = f"{system}\n{prompt}"
return (
prompt.replace("{goal}", self.agent.goal)
.replace("{role}", self.agent.role)
.replace("{backstory}", self.agent.backstory)
)

View File

@@ -1,90 +0,0 @@
from typing import Any
from crewai.agents.parser import AgentAction
from crewai.security import Fingerprint
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities.i18n import I18N
def execute_tool_and_check_finality(
agent_action: AgentAction,
tools: list[CrewStructuredTool],
i18n: I18N,
agent_key: str | None = None,
agent_role: str | None = None,
tools_handler: Any | None = None,
task: Any | None = None,
agent: Any | None = None,
function_calling_llm: Any | None = None,
fingerprint_context: dict[str, str] | None = None,
) -> ToolResult:
"""Execute a tool and check if the result should be treated as a final answer.
Args:
agent_action: The action containing the tool to execute
tools: List of available tools
i18n: Internationalization settings
agent_key: Optional key for event emission
agent_role: Optional role for event emission
tools_handler: Optional tools handler for tool execution
task: Optional task for tool execution
agent: Optional agent instance for tool execution
function_calling_llm: Optional LLM for function calling
Returns:
ToolResult containing the execution result and whether it should be treated as a final answer
"""
try:
tool_name_to_tool_map = {tool.name: tool for tool in tools}
if agent_key and agent_role and agent:
fingerprint_context = fingerprint_context or {}
if agent:
if hasattr(agent, "set_fingerprint") and callable(
agent.set_fingerprint
):
if isinstance(fingerprint_context, dict):
try:
fingerprint_obj = Fingerprint.from_dict(fingerprint_context)
agent.set_fingerprint(fingerprint_obj)
except Exception as e:
raise ValueError(f"Failed to set fingerprint: {e}")
# Create tool usage instance
tool_usage = ToolUsage(
tools_handler=tools_handler,
tools=tools,
function_calling_llm=function_calling_llm,
task=task,
agent=agent,
action=agent_action,
)
# Parse tool calling
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
if isinstance(tool_calling, ToolUsageErrorException):
return ToolResult(tool_calling.message, False)
# Check if tool name matches
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in tool_name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in tool_name_to_tool_map
]:
tool_result = tool_usage.use(tool_calling, agent_action.text)
tool = tool_name_to_tool_map.get(tool_calling.tool_name)
if tool:
return ToolResult(tool_result, tool.result_as_answer)
# Handle invalid tool name
tool_result = i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in tools]),
)
return ToolResult(tool_result, False)
except Exception as e:
raise e

View File

@@ -1,96 +0,0 @@
import json
import re
from typing import Any, get_origin
from pydantic import BaseModel, ValidationError
from crewai.utilities.converter import Converter, ConverterError
class TrainingConverter(Converter):
"""
A specialized converter for smaller LLMs (up to 7B parameters) that handles validation errors
by breaking down the model into individual fields and querying the LLM for each field separately.
"""
def to_pydantic(self, current_attempt=1) -> BaseModel:
try:
return super().to_pydantic(current_attempt)
except ConverterError:
return self._convert_field_by_field()
def _convert_field_by_field(self) -> BaseModel:
field_values = {}
for field_name, field_info in self.model.model_fields.items():
field_description = field_info.description
field_type = field_info.annotation
response = self._ask_llm_for_field(field_name, field_description)
value = self._process_field_value(response, field_type)
field_values[field_name] = value
try:
return self.model(**field_values)
except ValidationError as e:
raise ConverterError(
f"Failed to create model from individually collected fields: {e}"
)
def _ask_llm_for_field(self, field_name: str, field_description: str) -> str:
prompt = f"""
Based on the following information:
{self.text}
Please provide ONLY the {field_name} field value as described:
"{field_description}"
Respond with ONLY the requested information, nothing else.
"""
return self.llm.call(
[
{
"role": "system",
"content": f"Extract the {field_name} from the previous information.",
},
{"role": "user", "content": prompt},
]
)
def _process_field_value(self, response: str, field_type: Any) -> Any:
response = response.strip()
origin = get_origin(field_type)
if origin is list:
return self._parse_list(response)
if field_type is float:
return self._parse_float(response)
if field_type is str:
return response
return response
def _parse_list(self, response: str) -> list:
try:
if response.startswith("["):
return json.loads(response)
items = [item.strip() for item in response.split("\n") if item.strip()]
return [self._strip_bullet(item) for item in items]
except json.JSONDecodeError:
return [response]
def _parse_float(self, response: str) -> float:
try:
match = re.search(r"(\d+(\.\d+)?)", response)
return float(match.group(1)) if match else 0.0
except Exception:
return 0.0
def _strip_bullet(self, item: str) -> str:
if item.startswith(("- ", "* ")):
return item[2:].strip()
return item.strip()

View File

@@ -1,95 +0,0 @@
"""Tests for ChromaDB utility functions."""
from crewai.rag.chromadb.utils import (
MAX_COLLECTION_LENGTH,
MIN_COLLECTION_LENGTH,
_is_ipv4_pattern,
_sanitize_collection_name,
)
class TestChromaDBUtils:
"""Test suite for ChromaDB utility functions."""
def test_sanitize_collection_name_long_name(self) -> None:
"""Test sanitizing a very long collection name."""
long_name = "This is an extremely long role name that will definitely exceed the ChromaDB collection name limit of 63 characters and cause an error when used as a collection name"
sanitized = _sanitize_collection_name(long_name)
assert len(sanitized) <= MAX_COLLECTION_LENGTH
assert sanitized[0].isalnum()
assert sanitized[-1].isalnum()
assert all(c.isalnum() or c in ["_", "-"] for c in sanitized)
def test_sanitize_collection_name_special_chars(self) -> None:
"""Test sanitizing a name with special characters."""
special_chars = "Agent@123!#$%^&*()"
sanitized = _sanitize_collection_name(special_chars)
assert sanitized[0].isalnum()
assert sanitized[-1].isalnum()
assert all(c.isalnum() or c in ["_", "-"] for c in sanitized)
def test_sanitize_collection_name_short_name(self) -> None:
"""Test sanitizing a very short name."""
short_name = "A"
sanitized = _sanitize_collection_name(short_name)
assert len(sanitized) >= MIN_COLLECTION_LENGTH
assert sanitized[0].isalnum()
assert sanitized[-1].isalnum()
def test_sanitize_collection_name_bad_ends(self) -> None:
"""Test sanitizing a name with non-alphanumeric start/end."""
bad_ends = "_Agent_"
sanitized = _sanitize_collection_name(bad_ends)
assert sanitized[0].isalnum()
assert sanitized[-1].isalnum()
def test_sanitize_collection_name_none(self) -> None:
"""Test sanitizing a None value."""
sanitized = _sanitize_collection_name(None)
assert sanitized == "default_collection"
def test_sanitize_collection_name_ipv4_pattern(self) -> None:
"""Test sanitizing an IPv4 address."""
ipv4 = "192.168.1.1"
sanitized = _sanitize_collection_name(ipv4)
assert sanitized.startswith("ip_")
assert sanitized[0].isalnum()
assert sanitized[-1].isalnum()
assert all(c.isalnum() or c in ["_", "-"] for c in sanitized)
def test_is_ipv4_pattern(self) -> None:
"""Test IPv4 pattern detection."""
assert _is_ipv4_pattern("192.168.1.1") is True
assert _is_ipv4_pattern("not.an.ip.address") is False
def test_sanitize_collection_name_properties(self) -> None:
"""Test that sanitized collection names always meet ChromaDB requirements."""
test_cases: list[str] = [
"A" * 100, # Very long name
"_start_with_underscore",
"end_with_underscore_",
"contains@special#characters",
"192.168.1.1", # IPv4 address
"a" * 2, # Too short
]
for test_case in test_cases:
sanitized = _sanitize_collection_name(test_case)
assert len(sanitized) >= MIN_COLLECTION_LENGTH
assert len(sanitized) <= MAX_COLLECTION_LENGTH
assert sanitized[0].isalnum()
assert sanitized[-1].isalnum()
def test_sanitize_collection_name_empty_string(self) -> None:
"""Test sanitizing an empty string."""
sanitized = _sanitize_collection_name("")
assert sanitized == "default_collection"
def test_sanitize_collection_name_whitespace_only(self) -> None:
"""Test sanitizing a string with only whitespace."""
sanitized = _sanitize_collection_name(" ")
assert (
sanitized == "a__z"
) # Spaces become underscores, padded to meet requirements
assert len(sanitized) >= MIN_COLLECTION_LENGTH
assert sanitized[0].isalnum()
assert sanitized[-1].isalnum()

View File

@@ -1,250 +0,0 @@
"""Enhanced tests for embedding function factory."""
from unittest.mock import MagicMock, patch
import pytest
from crewai.rag.embeddings.factory import ( # type: ignore[import-untyped]
get_embedding_function,
)
from crewai.rag.embeddings.types import EmbeddingOptions # type: ignore[import-untyped]
def test_get_embedding_function_default() -> None:
"""Test default embedding function when no config provided."""
with patch("crewai.rag.embeddings.factory.OpenAIEmbeddingFunction") as mock_openai:
mock_instance = MagicMock()
mock_openai.return_value = mock_instance
with patch(
"crewai.rag.embeddings.factory.os.getenv", return_value="test-api-key"
):
result = get_embedding_function()
mock_openai.assert_called_once_with(
api_key="test-api-key", model_name="text-embedding-3-small"
)
assert result == mock_instance
def test_get_embedding_function_with_embedding_options() -> None:
"""Test embedding function creation with EmbeddingOptions object."""
with patch("crewai.rag.embeddings.factory.OpenAIEmbeddingFunction") as mock_openai:
mock_instance = MagicMock()
mock_openai.return_value = mock_instance
options = EmbeddingOptions(
provider="openai", api_key="test-key", model="text-embedding-3-large"
)
result = get_embedding_function(options)
call_kwargs = mock_openai.call_args.kwargs
assert "api_key" in call_kwargs
assert call_kwargs["api_key"].get_secret_value() == "test-key"
# OpenAI uses model_name parameter, not model
assert result == mock_instance
def test_get_embedding_function_sentence_transformer() -> None:
"""Test sentence transformer embedding function."""
with patch(
"crewai.rag.embeddings.factory.SentenceTransformerEmbeddingFunction"
) as mock_st:
mock_instance = MagicMock()
mock_st.return_value = mock_instance
config = {"provider": "sentence-transformer", "model_name": "all-MiniLM-L6-v2"}
result = get_embedding_function(config)
mock_st.assert_called_once_with(model_name="all-MiniLM-L6-v2")
assert result == mock_instance
def test_get_embedding_function_ollama() -> None:
"""Test Ollama embedding function."""
with patch("crewai.rag.embeddings.factory.OllamaEmbeddingFunction") as mock_ollama:
mock_instance = MagicMock()
mock_ollama.return_value = mock_instance
config = {
"provider": "ollama",
"model_name": "nomic-embed-text",
"url": "http://localhost:11434",
}
result = get_embedding_function(config)
mock_ollama.assert_called_once_with(
model_name="nomic-embed-text", url="http://localhost:11434"
)
assert result == mock_instance
def test_get_embedding_function_cohere() -> None:
"""Test Cohere embedding function."""
with patch("crewai.rag.embeddings.factory.CohereEmbeddingFunction") as mock_cohere:
mock_instance = MagicMock()
mock_cohere.return_value = mock_instance
config = {
"provider": "cohere",
"api_key": "cohere-key",
"model_name": "embed-english-v3.0",
}
result = get_embedding_function(config)
mock_cohere.assert_called_once_with(
api_key="cohere-key", model_name="embed-english-v3.0"
)
assert result == mock_instance
def test_get_embedding_function_huggingface() -> None:
"""Test HuggingFace embedding function."""
with patch("crewai.rag.embeddings.factory.HuggingFaceEmbeddingFunction") as mock_hf:
mock_instance = MagicMock()
mock_hf.return_value = mock_instance
config = {
"provider": "huggingface",
"api_key": "hf-token",
"model_name": "sentence-transformers/all-MiniLM-L6-v2",
}
result = get_embedding_function(config)
mock_hf.assert_called_once_with(
api_key="hf-token", model_name="sentence-transformers/all-MiniLM-L6-v2"
)
assert result == mock_instance
def test_get_embedding_function_onnx() -> None:
"""Test ONNX embedding function."""
with patch("crewai.rag.embeddings.factory.ONNXMiniLM_L6_V2") as mock_onnx:
mock_instance = MagicMock()
mock_onnx.return_value = mock_instance
config = {"provider": "onnx"}
result = get_embedding_function(config)
mock_onnx.assert_called_once()
assert result == mock_instance
def test_get_embedding_function_google_palm() -> None:
"""Test Google PaLM embedding function."""
with patch(
"crewai.rag.embeddings.factory.GooglePalmEmbeddingFunction"
) as mock_palm:
mock_instance = MagicMock()
mock_palm.return_value = mock_instance
config = {"provider": "google-palm", "api_key": "palm-key"}
result = get_embedding_function(config)
mock_palm.assert_called_once_with(api_key="palm-key")
assert result == mock_instance
def test_get_embedding_function_amazon_bedrock() -> None:
"""Test Amazon Bedrock embedding function."""
with patch(
"crewai.rag.embeddings.factory.AmazonBedrockEmbeddingFunction"
) as mock_bedrock:
mock_instance = MagicMock()
mock_bedrock.return_value = mock_instance
config = {
"provider": "amazon-bedrock",
"region_name": "us-west-2",
"model_name": "amazon.titan-embed-text-v1",
}
result = get_embedding_function(config)
mock_bedrock.assert_called_once_with(
region_name="us-west-2", model_name="amazon.titan-embed-text-v1"
)
assert result == mock_instance
def test_get_embedding_function_jina() -> None:
"""Test Jina embedding function."""
with patch("crewai.rag.embeddings.factory.JinaEmbeddingFunction") as mock_jina:
mock_instance = MagicMock()
mock_jina.return_value = mock_instance
config = {
"provider": "jina",
"api_key": "jina-key",
"model_name": "jina-embeddings-v2-base-en",
}
result = get_embedding_function(config)
mock_jina.assert_called_once_with(
api_key="jina-key", model_name="jina-embeddings-v2-base-en"
)
assert result == mock_instance
def test_get_embedding_function_unsupported_provider() -> None:
"""Test handling of unsupported provider."""
config = {"provider": "unsupported-provider"}
with pytest.raises(ValueError, match="Unsupported provider: unsupported-provider"):
get_embedding_function(config)
def test_get_embedding_function_config_modification() -> None:
"""Test that original config dict is not modified."""
original_config = {
"provider": "openai",
"api_key": "test-key",
"model": "text-embedding-3-small",
}
config_copy = original_config.copy()
with patch("crewai.rag.embeddings.factory.OpenAIEmbeddingFunction"):
get_embedding_function(config_copy)
assert config_copy == original_config
def test_get_embedding_function_exclude_none_values() -> None:
"""Test that None values are excluded from embedding function calls."""
with patch("crewai.rag.embeddings.factory.OpenAIEmbeddingFunction") as mock_openai:
mock_instance = MagicMock()
mock_openai.return_value = mock_instance
options = EmbeddingOptions(provider="openai", api_key="test-key", model=None)
result = get_embedding_function(options)
call_kwargs = mock_openai.call_args.kwargs
assert "api_key" in call_kwargs
assert call_kwargs["api_key"].get_secret_value() == "test-key"
assert "model" not in call_kwargs
assert result == mock_instance
def test_get_embedding_function_instructor() -> None:
"""Test Instructor embedding function."""
with patch(
"crewai.rag.embeddings.factory.InstructorEmbeddingFunction"
) as mock_instructor:
mock_instance = MagicMock()
mock_instructor.return_value = mock_instance
config = {"provider": "instructor", "model_name": "hkunlp/instructor-large"}
result = get_embedding_function(config)
mock_instructor.assert_called_once_with(model_name="hkunlp/instructor-large")
assert result == mock_instance

View File

@@ -1,25 +0,0 @@
from unittest.mock import patch
import pytest
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
def test_configure_embedder_importerror():
configurator = EmbeddingConfigurator()
embedder_config = {
'provider': 'openai',
'config': {
'model': 'text-embedding-ada-002',
}
}
with patch('chromadb.utils.embedding_functions.openai_embedding_function.OpenAIEmbeddingFunction') as mock_openai:
mock_openai.side_effect = ImportError("Module not found.")
with pytest.raises(ImportError) as exc_info:
configurator.configure_embedder(embedder_config)
assert str(exc_info.value) == "Module not found."
mock_openai.assert_called_once()

View File

@@ -1,43 +0,0 @@
"""Tests for import utilities."""
from unittest.mock import patch
import pytest
from crewai.utilities.import_utils import OptionalDependencyError, require
class TestRequire:
"""Test the require function."""
def test_require_existing_module(self):
"""Test requiring a module that exists."""
module = require("json", purpose="testing")
assert module.__name__ == "json"
def test_require_missing_module(self):
"""Test requiring a module that doesn't exist."""
with pytest.raises(OptionalDependencyError) as exc_info:
require("nonexistent_module_xyz", purpose="testing missing module")
error_msg = str(exc_info.value)
assert (
"testing missing module requires the optional dependency 'nonexistent_module_xyz'"
in error_msg
)
assert "uv add nonexistent_module_xyz" in error_msg
def test_require_with_import_error(self):
"""Test that ImportError is properly chained."""
with patch("importlib.import_module") as mock_import:
mock_import.side_effect = ImportError("Module import failed")
with pytest.raises(OptionalDependencyError) as exc_info:
require("some_module", purpose="testing error handling")
assert isinstance(exc_info.value.__cause__, ImportError)
assert str(exc_info.value.__cause__) == "Module import failed"
def test_optional_dependency_error_is_import_error(self):
"""Test that OptionalDependencyError is a subclass of ImportError."""
assert issubclass(OptionalDependencyError, ImportError)

View File

@@ -1,15 +0,0 @@
.DS_Store
.pytest_cache
__pycache__
dist/
.env
.idea
test.py
docs_crew/
chroma.sqlite3
venv
.venv/
# chroma db
db
crewai-rag-tool.lock

View File

@@ -1,335 +0,0 @@
## Building CrewAI Tools
This guide shows you how to build highquality CrewAI tools that match the patterns in this repository and are ready to be merged. It focuses on: architecture, conventions, environment variables, dependencies, testing, documentation, and a complete example.
### Who this is for
- Contributors creating new tools under `crewai_tools/tools/*`
- Maintainers reviewing PRs for consistency and DX
---
## Quickstart checklist
1. Create a new folder under `crewai_tools/tools/<your_tool_name>/` with a `README.md` and a `<your_tool_name>.py`.
2. Implement a class that ends with `Tool` and subclasses `BaseTool` (or `RagTool` when appropriate).
3. Define a Pydantic `args_schema` with explicit field descriptions and validation.
4. Declare `env_vars` and `package_dependencies` in the class when needed.
5. Lazily initialize clients in `__init__` or `_run` and handle missing credentials with clear errors.
6. Implement `_run(...) -> str | dict` and, if needed, `_arun(...)`.
7. Add tests under `tests/tools/` (unit, no real network calls; mock or record safely).
8. Add a concise tool `README.md` with usage and required env vars.
9. If you add optional dependencies, register them in `pyproject.toml` under `[project.optional-dependencies]` and reference that extra in your tool docs.
10. Run `uv run pytest` and `pre-commit run -a` locally; ensure green.
---
## Tool anatomy and conventions
### BaseTool pattern
All tools follow this structure:
```python
from typing import Any, List, Optional, Type
import os
from pydantic import BaseModel, Field
from crewai.tools import BaseTool, EnvVar
class MyToolInput(BaseModel):
"""Input schema for MyTool."""
query: str = Field(..., description="Your input description here")
limit: int = Field(5, ge=1, le=50, description="Max items to return")
class MyTool(BaseTool):
name: str = "My Tool"
description: str = "Explain succinctly what this tool does and when to use it."
args_schema: Type[BaseModel] = MyToolInput
# Only include when applicable
env_vars: List[EnvVar] = [
EnvVar(name="MY_API_KEY", description="API key for My service", required=True),
]
package_dependencies: List[str] = ["my-sdk"]
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
# Lazy import to keep base install light
try:
import my_sdk # noqa: F401
except Exception as exc:
raise ImportError(
"Missing optional dependency 'my-sdk'. Install with: \n"
" uv add crewai-tools --extra my-sdk\n"
"or\n"
" pip install my-sdk\n"
) from exc
if "MY_API_KEY" not in os.environ:
raise ValueError("Environment variable MY_API_KEY is required for MyTool")
def _run(self, query: str, limit: int = 5, **_: Any) -> str:
"""Synchronous execution. Return a concise string or JSON string."""
# Implement your logic here; do not print. Return the content.
# Handle errors gracefully, return clear messages.
return f"Processed {query} with limit={limit}"
async def _arun(self, *args: Any, **kwargs: Any) -> str:
"""Optional async counterpart if your client supports it."""
# Prefer delegating to _run when the client is thread-safe
return self._run(*args, **kwargs)
```
Key points:
- Class name must end with `Tool` to be autodiscovered by our tooling.
- Use `args_schema` for inputs; always include `description` and validation.
- Validate env vars early and fail with actionable errors.
- Keep outputs deterministic and compact; favor `str` (possibly JSONencoded) or small dicts converted to strings.
- Avoid printing; return the final string.
### Error handling
- Wrap network and I/O with try/except and return a helpful message. See `BraveSearchTool` and others for patterns.
- Validate required inputs and environment configuration with clear messages.
- Keep exceptions userfriendly; do not leak stack traces.
### Rate limiting and retries
- If the upstream API enforces request pacing, implement minimal rate limiting (see `BraveSearchTool`).
- Consider idempotency and backoff for transient errors where appropriate.
### Async support
- Implement `_arun` only if your library has a true async client or your sync calls are threadsafe.
- Otherwise, delegate `_arun` to `_run` as in multiple existing tools.
### Returning values
- Return a string (or JSON string) thats ready to display in an agent transcript.
- If returning structured data, keep it small and humanreadable. Use stable keys and ordering.
---
## RAG tools and adapters
If your tool is a knowledge source, consider extending `RagTool` and/or creating an adapter.
- `RagTool` exposes `add(...)` and a `query(question: str) -> str` contract through an `Adapter`.
- See `crewai_tools/tools/rag/rag_tool.py` and adapters like `embedchain_adapter.py` and `lancedb_adapter.py`.
Minimal adapter example:
```python
from typing import Any
from pydantic import BaseModel
from crewai_tools.tools.rag.rag_tool import Adapter, RagTool
class MemoryAdapter(Adapter):
store: list[str] = []
def add(self, text: str, **_: Any) -> None:
self.store.append(text)
def query(self, question: str) -> str:
# naive demo: return all text containing any word from the question
tokens = set(question.lower().split())
hits = [t for t in self.store if tokens & set(t.lower().split())]
return "\n".join(hits) if hits else "No relevant content found."
class MemoryRagTool(RagTool):
name: str = "Inmemory RAG"
description: str = "Toy RAG that stores text in memory and returns matches."
adapter: Adapter = MemoryAdapter()
```
When using external vector DBs (MongoDB, Qdrant, Weaviate), study the existing tools to follow indexing, embedding, and query configuration patterns closely.
---
## Toolkits (multiple related tools)
Some integrations expose a toolkit (a group of tools) rather than a single class. See Bedrock `browser_toolkit.py` and `code_interpreter_toolkit.py`.
Guidelines:
- Provide small, focused `BaseTool` classes for each operation (e.g., `navigate`, `click`, `extract_text`).
- Offer a helper `create_<name>_toolkit(...) -> Tuple[ToolkitClass, List[BaseTool]]` to create tools and manage resources.
- If you open external resources (browsers, interpreters), support cleanup methods and optionally context manager usage.
---
## Environment variables and dependencies
### env_vars
- Declare as `env_vars: List[EnvVar]` with `name`, `description`, `required`, and optional `default`.
- Validate presence in `__init__` or on first `_run` call.
### Dependencies
- List runtime packages in `package_dependencies` on the class.
- If they are genuinely optional, add an extra under `[project.optional-dependencies]` in `pyproject.toml` (e.g., `tavily-python`, `serpapi`, `scrapfly-sdk`).
- Use lazy imports to avoid hard deps for users who dont need the tool.
---
## Testing
Place tests under `tests/tools/` and follow these rules:
- Do not hit real external services in CI. Use mocks, fakes, or recorded fixtures where allowed.
- Validate input validation, env var handling, error messages, and happy path output formatting.
- Keep tests fast and deterministic.
Example skeleton (`tests/tools/my_tool_test.py`):
```python
import os
import pytest
from crewai_tools.tools.my_tool.my_tool import MyTool
def test_requires_env_var(monkeypatch):
monkeypatch.delenv("MY_API_KEY", raising=False)
with pytest.raises(ValueError):
MyTool()
def test_happy_path(monkeypatch):
monkeypatch.setenv("MY_API_KEY", "test")
tool = MyTool()
result = tool.run(query="hello", limit=2)
assert "hello" in result
```
Run locally:
```bash
uv run pytest
pre-commit run -a
```
---
## Documentation
Each tool must include a `README.md` in its folder with:
- What it does and when to use it
- Required env vars and optional extras (with install snippet)
- Minimal usage example
Update the root `README.md` only if the tool introduces a new category or notable capability.
---
## Discovery and specs
Our internal tooling discovers classes whose names end with `Tool`. Keep your class exported from the module path under `crewai_tools/tools/...` to be picked up by scripts like `generate_tool_specs.py`.
---
## Full example: “Weather Search Tool”
This example demonstrates: `args_schema`, `env_vars`, `package_dependencies`, lazy imports, validation, and robust error handling.
```python
# file: crewai_tools/tools/weather_tool/weather_tool.py
from typing import Any, List, Optional, Type
import os
import requests
from pydantic import BaseModel, Field
from crewai.tools import BaseTool, EnvVar
class WeatherToolInput(BaseModel):
"""Input schema for WeatherTool."""
city: str = Field(..., description="City name, e.g., 'Berlin'")
country: Optional[str] = Field(None, description="ISO country code, e.g., 'DE'")
units: str = Field(
default="metric",
description="Units system: 'metric' or 'imperial'",
pattern=r"^(metric|imperial)$",
)
class WeatherTool(BaseTool):
name: str = "Weather Search"
description: str = (
"Look up current weather for a city using a public weather API."
)
args_schema: Type[BaseModel] = WeatherToolInput
env_vars: List[EnvVar] = [
EnvVar(
name="WEATHER_API_KEY",
description="API key for the weather service",
required=True,
),
]
package_dependencies: List[str] = ["requests"]
base_url: str = "https://api.openweathermap.org/data/2.5/weather"
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
if "WEATHER_API_KEY" not in os.environ:
raise ValueError("WEATHER_API_KEY is required for WeatherTool")
def _run(self, city: str, country: Optional[str] = None, units: str = "metric") -> str:
try:
q = f"{city},{country}" if country else city
params = {
"q": q,
"units": units,
"appid": os.environ["WEATHER_API_KEY"],
}
resp = requests.get(self.base_url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
main = data.get("weather", [{}])[0].get("main", "Unknown")
desc = data.get("weather", [{}])[0].get("description", "")
temp = data.get("main", {}).get("temp")
feels = data.get("main", {}).get("feels_like")
city_name = data.get("name", city)
return (
f"Weather in {city_name}: {main} ({desc}). "
f"Temperature: {temp}°, feels like {feels}°."
)
except requests.Timeout:
return "Weather service timed out. Please try again later."
except requests.HTTPError as e:
return f"Weather service error: {e.response.status_code} {e.response.text[:120]}"
except Exception as e:
return f"Unexpected error fetching weather: {e}"
```
Folder layout:
```
crewai_tools/tools/weather_tool/
├─ weather_tool.py
└─ README.md
```
And `README.md` should document env vars and usage.
---
## PR checklist
- [ ] Tool lives under `crewai_tools/tools/<name>/`
- [ ] Class ends with `Tool` and subclasses `BaseTool` (or `RagTool`)
- [ ] Precise `args_schema` with descriptions and validation
- [ ] `env_vars` declared (if any) and validated
- [ ] `package_dependencies` and optional extras added in `pyproject.toml` (if any)
- [ ] Clear error handling; no prints
- [ ] Unit tests added (`tests/tools/`), fast and deterministic
- [ ] Tool `README.md` with usage and env vars
- [ ] `pre-commit` and `pytest` pass locally
---
## Tips for great DX
- Keep responses short and useful—agents quote your tool output directly.
- Validate early; fail fast with actionable guidance.
- Prefer lazy imports; minimize default install surface.
- Mirror patterns from similar tools in this repo for a consistent developer experience.
Happy building!

View File

@@ -1,21 +0,0 @@
MIT License
Copyright (c) 2024 João Moura
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -1,229 +0,0 @@
<div align="center">
![Logo of crewAI, two people rowing on a boat](./assets/crewai_logo.png)
<div align="left">
# CrewAI Tools
Empower your CrewAI agents with powerful, customizable tools to elevate their capabilities and tackle sophisticated, real-world tasks.
CrewAI Tools provide the essential functionality to extend your agents, helping you rapidly enhance your automations with reliable, ready-to-use tools or custom-built solutions tailored precisely to your needs.
---
## Quick Links
[Homepage](https://www.crewai.com/) | [Documentation](https://docs.crewai.com/) | [Examples](https://github.com/crewAIInc/crewAI-examples) | [Community](https://community.crewai.com/)
---
## Available Tools
CrewAI provides an extensive collection of powerful tools ready to enhance your agents:
- **File Management**: `FileReadTool`, `FileWriteTool`
- **Web Scraping**: `ScrapeWebsiteTool`, `SeleniumScrapingTool`
- **Database Integrations**: `PGSearchTool`, `MySQLSearchTool`
- **Vector Database Integrations**: `MongoDBVectorSearchTool`, `QdrantVectorSearchTool`, `WeaviateVectorSearchTool`
- **API Integrations**: `SerperApiTool`, `EXASearchTool`
- **AI-powered Tools**: `DallETool`, `VisionTool`, `StagehandTool`
And many more robust tools to simplify your agent integrations.
---
## Creating Custom Tools
CrewAI offers two straightforward approaches to creating custom tools:
### Subclassing `BaseTool`
Define your tool by subclassing:
```python
from crewai.tools import BaseTool
class MyCustomTool(BaseTool):
name: str = "Tool Name"
description: str = "Detailed description here."
def _run(self, *args, **kwargs):
# Your tool logic here
```
### Using the `tool` Decorator
Quickly create lightweight tools using decorators:
```python
from crewai import tool
@tool("Tool Name")
def my_custom_function(input):
# Tool logic here
return output
```
---
## CrewAI Tools and MCP
CrewAI Tools supports the Model Context Protocol (MCP). It gives you access to thousands of tools from the hundreds of MCP servers out there built by the community.
Before you start using MCP with CrewAI tools, you need to install the `mcp` extra dependencies:
```bash
pip install crewai-tools[mcp]
# or
uv add crewai-tools --extra mcp
```
To quickly get started with MCP in CrewAI you have 2 options:
### Option 1: Fully managed connection
In this scenario we use a contextmanager (`with` statement) to start and stop the the connection with the MCP server.
This is done in the background and you only get to interact with the CrewAI tools corresponding to the MCP server's tools.
For an STDIO based MCP server:
```python
from mcp import StdioServerParameters
from crewai_tools import MCPServerAdapter
serverparams = StdioServerParameters(
command="uvx",
args=["--quiet", "pubmedmcp@0.1.3"],
env={"UV_PYTHON": "3.12", **os.environ},
)
with MCPServerAdapter(serverparams) as tools:
# tools is now a list of CrewAI Tools matching 1:1 with the MCP server's tools
agent = Agent(..., tools=tools)
task = Task(...)
crew = Crew(..., agents=[agent], tasks=[task])
crew.kickoff(...)
```
For an SSE based MCP server:
```python
serverparams = {"url": "http://localhost:8000/sse"}
with MCPServerAdapter(serverparams) as tools:
# tools is now a list of CrewAI Tools matching 1:1 with the MCP server's tools
agent = Agent(..., tools=tools)
task = Task(...)
crew = Crew(..., agents=[agent], tasks=[task])
crew.kickoff(...)
```
### Option 2: More control over the MCP connection
If you need more control over the MCP connection, you can instanciate the MCPServerAdapter into an `mcp_server_adapter` object which can be used to manage the connection with the MCP server and access the available tools.
**important**: in this case you need to call `mcp_server_adapter.stop()` to make sure the connection is correctly stopped. We recommend that you use a `try ... finally` block run to make sure the `.stop()` is called even in case of errors.
Here is the same example for an STDIO MCP Server:
```python
from mcp import StdioServerParameters
from crewai_tools import MCPServerAdapter
serverparams = StdioServerParameters(
command="uvx",
args=["--quiet", "pubmedmcp@0.1.3"],
env={"UV_PYTHON": "3.12", **os.environ},
)
try:
mcp_server_adapter = MCPServerAdapter(serverparams)
tools = mcp_server_adapter.tools
# tools is now a list of CrewAI Tools matching 1:1 with the MCP server's tools
agent = Agent(..., tools=tools)
task = Task(...)
crew = Crew(..., agents=[agent], tasks=[task])
crew.kickoff(...)
# ** important ** don't forget to stop the connection
finally:
mcp_server_adapter.stop()
```
And finally the same thing but for an SSE MCP Server:
```python
from mcp import StdioServerParameters
from crewai_tools import MCPServerAdapter
serverparams = {"url": "http://localhost:8000/sse"}
try:
mcp_server_adapter = MCPServerAdapter(serverparams)
tools = mcp_server_adapter.tools
# tools is now a list of CrewAI Tools matching 1:1 with the MCP server's tools
agent = Agent(..., tools=tools)
task = Task(...)
crew = Crew(..., agents=[agent], tasks=[task])
crew.kickoff(...)
# ** important ** don't forget to stop the connection
finally:
mcp_server_adapter.stop()
```
### Considerations & Limitations
#### Staying Safe with MCP
Always make sure that you trust the MCP Server before using it. Using an STDIO server will execute code on your machine. Using SSE is still not a silver bullet with many injection possible into your application from a malicious MCP server.
#### Limitations
* At this time we only support tools from MCP Server not other type of primitives like prompts, resources...
* We only return the first text output returned by the MCP Server tool using `.content[0].text`
---
## Why Use CrewAI Tools?
- **Simplicity & Flexibility**: Easy-to-use yet powerful enough for complex workflows.
- **Rapid Integration**: Seamlessly incorporate external services, APIs, and databases.
- **Enterprise Ready**: Built for stability, performance, and consistent results.
---
## Contribution Guidelines
We welcome contributions from the community!
1. Fork and clone the repository.
2. Create a new branch (`git checkout -b feature/my-feature`).
3. Commit your changes (`git commit -m 'Add my feature'`).
4. Push your branch (`git push origin feature/my-feature`).
5. Open a pull request.
---
## Developer Quickstart
```shell
pip install crewai[tools]
```
### Development Setup
- Install dependencies: `uv sync`
- Run tests: `uv run pytest`
- Run static type checking: `uv run pyright`
- Set up pre-commit hooks: `pre-commit install`
---
## Support and Community
Join our rapidly growing community and receive real-time support:
- [Discourse](https://community.crewai.com/)
- [Open an Issue](https://github.com/crewAIInc/crewAI/issues)
Build smarter, faster, and more powerful AI solutions—powered by CrewAI Tools.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 14 KiB

View File

@@ -1,155 +0,0 @@
#!/usr/bin/env python3
import inspect
import json
from pathlib import Path
from typing import Any
from crewai.tools.base_tool import BaseTool, EnvVar
from crewai_tools import tools
from pydantic.json_schema import GenerateJsonSchema
from pydantic_core import PydanticOmit
class SchemaGenerator(GenerateJsonSchema):
def handle_invalid_for_json_schema(self, schema, error_info):
raise PydanticOmit
class ToolSpecExtractor:
def __init__(self) -> None:
self.tools_spec: list[dict[str, Any]] = []
self.processed_tools: set[str] = set()
def extract_all_tools(self) -> list[dict[str, Any]]:
for name in dir(tools):
if name.endswith("Tool") and name not in self.processed_tools:
obj = getattr(tools, name, None)
if inspect.isclass(obj):
self.extract_tool_info(obj)
self.processed_tools.add(name)
return self.tools_spec
def extract_tool_info(self, tool_class: BaseTool) -> None:
try:
core_schema = tool_class.__pydantic_core_schema__
if not core_schema:
return
schema = self._unwrap_schema(core_schema)
fields = schema.get("schema", {}).get("fields", {})
tool_info = {
"name": tool_class.__name__,
"humanized_name": self._extract_field_default(
fields.get("name"), fallback=tool_class.__name__
),
"description": self._extract_field_default(
fields.get("description")
).strip(),
"run_params_schema": self._extract_params(fields.get("args_schema")),
"init_params_schema": self._extract_init_params(tool_class),
"env_vars": self._extract_env_vars(fields.get("env_vars")),
"package_dependencies": self._extract_field_default(
fields.get("package_dependencies"), fallback=[]
),
}
self.tools_spec.append(tool_info)
except Exception as e:
print(f"Error extracting {tool_class.__name__}: {e}")
def _unwrap_schema(self, schema: dict) -> dict:
while (
schema.get("type") in {"function-after", "default"} and "schema" in schema
):
schema = schema["schema"]
return schema
def _extract_field_default(self, field: dict | None, fallback: str = "") -> str:
if not field:
return fallback
schema = field.get("schema", {})
default = schema.get("default")
return default if isinstance(default, (list, str, int)) else fallback
def _extract_params(
self, args_schema_field: dict | None
) -> list[dict[str, str]]:
if not args_schema_field:
return {}
args_schema_class = args_schema_field.get("schema", {}).get("default")
if not (
inspect.isclass(args_schema_class)
and hasattr(args_schema_class, "__pydantic_core_schema__")
):
return {}
try:
return args_schema_class.model_json_schema(
schema_generator=SchemaGenerator, mode="validation"
)
except Exception as e:
print(f"Error extracting params from {args_schema_class}: {e}")
return {}
def _extract_env_vars(self, env_vars_field: dict | None) -> list[dict[str, str]]:
if not env_vars_field:
return []
env_vars = []
for env_var in env_vars_field.get("schema", {}).get("default", []):
if isinstance(env_var, EnvVar):
env_vars.append(
{
"name": env_var.name,
"description": env_var.description,
"required": env_var.required,
"default": env_var.default,
}
)
return env_vars
def _extract_init_params(self, tool_class: BaseTool) -> dict:
ignored_init_params = [
"name",
"description",
"env_vars",
"args_schema",
"description_updated",
"cache_function",
"result_as_answer",
"max_usage_count",
"current_usage_count",
"package_dependencies",
]
json_schema = tool_class.model_json_schema(
schema_generator=SchemaGenerator, mode="serialization"
)
properties = {}
for key, value in json_schema["properties"].items():
if key not in ignored_init_params:
properties[key] = value
json_schema["properties"] = properties
return json_schema
def save_to_json(self, output_path: str) -> None:
with open(output_path, "w", encoding="utf-8") as f:
json.dump({"tools": self.tools_spec}, f, indent=2, sort_keys=True)
print(f"Saved tool specs to {output_path}")
if __name__ == "__main__":
output_file = Path(__file__).parent / "tool.specs.json"
extractor = ToolSpecExtractor()
specs = extractor.extract_all_tools()
extractor.save_to_json(str(output_file))
print(f"Extracted {len(specs)} tool classes.")

View File

@@ -1,172 +0,0 @@
[project]
name = "crewai-tools"
dynamic = ["version"]
description = "Set of tools for the crewAI framework"
readme = "README.md"
authors = [
{ name = "João Moura", email = "joaomdmoura@gmail.com" },
]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai-core",
"click>=8.1.8",
"lancedb>=0.5.4",
"pytube>=15.0.0",
"requests>=2.31.0",
"docker>=7.1.0",
"tiktoken>=0.8.0",
"stagehand>=0.4.1",
"portalocker==2.7.0",
"beautifulsoup4>=4.13.4",
"pypdf>=5.9.0",
"python-docx>=1.2.0",
"youtube-transcript-api>=1.2.2",
]
[project.urls]
Homepage = "https://crewai.com"
Repository = "https://github.com/crewAIInc/crewAI-tools"
Documentation = "https://docs.crewai.com"
[project.optional-dependencies]
scrapfly-sdk = [
"scrapfly-sdk>=0.8.19",
]
sqlalchemy = [
"sqlalchemy>=2.0.35",
]
multion = [
"multion>=1.1.0",
]
firecrawl-py = [
"firecrawl-py>=1.8.0",
]
composio-core = [
"composio-core>=0.6.11.post1",
]
browserbase = [
"browserbase>=1.0.5",
]
weaviate-client = [
"weaviate-client>=4.10.2",
]
patronus = [
"patronus>=0.0.16",
]
serpapi = [
"serpapi>=0.1.5",
]
beautifulsoup4 = [
"beautifulsoup4>=4.12.3",
]
selenium = [
"selenium>=4.27.1",
]
spider-client = [
"spider-client>=0.1.25",
]
scrapegraph-py = [
"scrapegraph-py>=1.9.0",
]
linkup-sdk = [
"linkup-sdk>=0.2.2",
]
tavily-python = [
"tavily-python>=0.5.4",
]
hyperbrowser = [
"hyperbrowser>=0.18.0",
]
snowflake = [
"cryptography>=43.0.3",
"snowflake-connector-python>=3.12.4",
"snowflake-sqlalchemy>=1.7.3",
]
singlestore = [
"singlestoredb>=1.12.4",
"SQLAlchemy>=2.0.40",
]
exa-py = [
"exa-py>=1.8.7",
]
qdrant-client = [
"qdrant-client>=1.12.1",
]
apify = [
"langchain-apify>=0.1.2,<1.0.0",
]
databricks-sdk = [
"databricks-sdk>=0.46.0",
]
couchbase = [
"couchbase>=4.3.5",
]
mcp = [
"mcp>=1.6.0",
"mcpadapt>=0.1.9",
]
stagehand = [
"stagehand>=0.4.1",
]
github = [
"gitpython==3.1.38",
"PyGithub==1.59.1",
]
rag = [
"python-docx>=1.1.0",
"lxml>=5.3.0,<5.4.0", # Pin to avoid etree import issues in 5.4.0
]
xml = [
"unstructured[local-inference, all-docs]>=0.17.2"
]
oxylabs = [
"oxylabs==2.0.0"
]
mongodb = [
"pymongo>=4.13"
]
mysql = [
"pymysql>=1.1.1"
]
postgresql = [
"psycopg2-binary>=2.9.10"
]
bedrock = [
"beautifulsoup4>=4.13.4",
"bedrock-agentcore>=0.1.0",
"playwright>=1.52.0",
"nest-asyncio>=1.6.0",
]
contextual = [
"contextual-client>=0.1.0",
"nest-asyncio>=1.6.0",
]
[tool.hatch.metadata]
allow-direct-references = true
[tool.pytest.ini_options]
testpaths = ["tests"]
[tool.hatch.version]
path = "src/crewai_tools/__init__.py"
[build-system]
requires = [
"hatchling",
]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/crewai_tools"]
[dependency-groups]
dev = [
"pytest-asyncio>=0.25.2",
"pytest>=8.0.0",
"pytest-recording>=0.13.3",
"mypy>=1.18.1",
"ruff>=0.13.0",
]

View File

@@ -1,100 +0,0 @@
__version__ = "0.71.0"
from .adapters.enterprise_adapter import EnterpriseActionTool
from .adapters.mcp_adapter import MCPServerAdapter
from .adapters.zapier_adapter import ZapierActionTool
from .aws import (
BedrockInvokeAgentTool,
BedrockKBRetrieverTool,
S3ReaderTool,
S3WriterTool,
)
from .tools import (
AIMindTool,
ApifyActorsTool,
ArxivPaperTool,
BraveSearchTool,
BrightDataDatasetTool,
BrightDataSearchTool,
BrightDataWebUnlockerTool,
BrowserbaseLoadTool,
CodeDocsSearchTool,
CodeInterpreterTool,
ComposioTool,
ContextualAICreateAgentTool,
ContextualAIParseTool,
ContextualAIQueryTool,
ContextualAIRerankTool,
CouchbaseFTSVectorSearchTool,
CrewaiEnterpriseTools,
CrewaiPlatformTools,
CSVSearchTool,
DallETool,
DatabricksQueryTool,
DirectoryReadTool,
DirectorySearchTool,
DOCXSearchTool,
EXASearchTool,
FileCompressorTool,
FileReadTool,
FileWriterTool,
FirecrawlCrawlWebsiteTool,
FirecrawlScrapeWebsiteTool,
FirecrawlSearchTool,
GenerateCrewaiAutomationTool,
GithubSearchTool,
HyperbrowserLoadTool,
InvokeCrewAIAutomationTool,
JSONSearchTool,
LinkupSearchTool,
LlamaIndexTool,
MDXSearchTool,
MongoDBVectorSearchConfig,
MongoDBVectorSearchTool,
MultiOnTool,
MySQLSearchTool,
NL2SQLTool,
OCRTool,
OxylabsAmazonProductScraperTool,
OxylabsAmazonSearchScraperTool,
OxylabsGoogleSearchScraperTool,
OxylabsUniversalScraperTool,
ParallelSearchTool,
PatronusEvalTool,
PatronusLocalEvaluatorTool,
PatronusPredefinedCriteriaEvalTool,
PDFSearchTool,
PGSearchTool,
QdrantVectorSearchTool,
RagTool,
ScrapeElementFromWebsiteTool,
ScrapegraphScrapeTool,
ScrapegraphScrapeToolSchema,
ScrapeWebsiteTool,
ScrapflyScrapeWebsiteTool,
SeleniumScrapingTool,
SerpApiGoogleSearchTool,
SerpApiGoogleShoppingTool,
SerperDevTool,
SerperScrapeWebsiteTool,
SerplyJobSearchTool,
SerplyNewsSearchTool,
SerplyScholarSearchTool,
SerplyWebpageToMarkdownTool,
SerplyWebSearchTool,
SingleStoreSearchTool,
SnowflakeConfig,
SnowflakeSearchTool,
SpiderTool,
StagehandTool,
TavilyExtractorTool,
TavilySearchTool,
TXTSearchTool,
VisionTool,
WeaviateVectorSearchTool,
WebsiteSearchTool,
XMLSearchTool,
YoutubeChannelSearchTool,
YoutubeVideoSearchTool,
ZapierActionTools,
)

View File

@@ -1,267 +0,0 @@
"""Adapter for CrewAI's native RAG system."""
import hashlib
from pathlib import Path
from typing import Any, TypeAlias, TypedDict
from crewai.rag.config.types import RagConfigType
from crewai.rag.config.utils import get_rag_client
from crewai.rag.core.base_client import BaseClient
from crewai.rag.factory import create_client
from crewai.rag.types import BaseRecord, SearchResult
from crewai_tools.rag.data_types import DataType
from crewai_tools.rag.misc import sanitize_metadata_for_chromadb
from crewai_tools.tools.rag.rag_tool import Adapter
from pydantic import PrivateAttr
from typing_extensions import Unpack
ContentItem: TypeAlias = str | Path | dict[str, Any]
class AddDocumentParams(TypedDict, total=False):
"""Parameters for adding documents to the RAG system."""
data_type: DataType
metadata: dict[str, Any]
website: str
url: str
file_path: str | Path
github_url: str
youtube_url: str
directory_path: str | Path
class CrewAIRagAdapter(Adapter):
"""Adapter that uses CrewAI's native RAG system.
Supports custom vector database configuration through the config parameter.
"""
collection_name: str = "default"
summarize: bool = False
similarity_threshold: float = 0.6
limit: int = 5
config: RagConfigType | None = None
_client: BaseClient | None = PrivateAttr(default=None)
def model_post_init(self, __context: Any) -> None:
"""Initialize the CrewAI RAG client after model initialization."""
if self.config is not None:
self._client = create_client(self.config)
else:
self._client = get_rag_client()
self._client.get_or_create_collection(collection_name=self.collection_name)
def query(
self,
question: str,
similarity_threshold: float | None = None,
limit: int | None = None,
) -> str:
"""Query the knowledge base with a question.
Args:
question: The question to ask
similarity_threshold: Minimum similarity score for results (default: 0.6)
limit: Maximum number of results to return (default: 5)
Returns:
Relevant content from the knowledge base
"""
search_limit = limit if limit is not None else self.limit
search_threshold = (
similarity_threshold
if similarity_threshold is not None
else self.similarity_threshold
)
results: list[SearchResult] = self._client.search(
collection_name=self.collection_name,
query=question,
limit=search_limit,
score_threshold=search_threshold,
)
if not results:
return "No relevant content found."
contents: list[str] = []
for result in results:
content: str = result.get("content", "")
if content:
contents.append(content)
return "\n\n".join(contents)
def add(self, *args: ContentItem, **kwargs: Unpack[AddDocumentParams]) -> None:
"""Add content to the knowledge base.
This method handles various input types and converts them to documents
for the vector database. It supports the data_type parameter for
compatibility with existing tools.
Args:
*args: Content items to add (strings, paths, or document dicts)
**kwargs: Additional parameters including data_type, metadata, etc.
"""
import os
from crewai_tools.rag.base_loader import LoaderResult
from crewai_tools.rag.data_types import DataType, DataTypes
from crewai_tools.rag.source_content import SourceContent
documents: list[BaseRecord] = []
data_type: DataType | None = kwargs.get("data_type")
base_metadata: dict[str, Any] = kwargs.get("metadata", {})
for arg in args:
source_ref: str
if isinstance(arg, dict):
source_ref = str(arg.get("source", arg.get("content", "")))
else:
source_ref = str(arg)
if not data_type:
data_type = DataTypes.from_content(source_ref)
if data_type == DataType.DIRECTORY:
if not os.path.isdir(source_ref):
raise ValueError(f"Directory does not exist: {source_ref}")
# Define binary and non-text file extensions to skip
binary_extensions = {
".pyc",
".pyo",
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".ico",
".svg",
".webp",
".pdf",
".zip",
".tar",
".gz",
".bz2",
".7z",
".rar",
".exe",
".dll",
".so",
".dylib",
".bin",
".dat",
".db",
".sqlite",
".class",
".jar",
".war",
".ear",
}
for root, dirs, files in os.walk(source_ref):
dirs[:] = [d for d in dirs if not d.startswith(".")]
for filename in files:
if filename.startswith("."):
continue
# Skip binary files based on extension
file_ext = os.path.splitext(filename)[1].lower()
if file_ext in binary_extensions:
continue
# Skip __pycache__ directories
if "__pycache__" in root:
continue
file_path: str = os.path.join(root, filename)
try:
file_data_type: DataType = DataTypes.from_content(file_path)
file_loader = file_data_type.get_loader()
file_chunker = file_data_type.get_chunker()
file_source = SourceContent(file_path)
file_result: LoaderResult = file_loader.load(file_source)
file_chunks = file_chunker.chunk(file_result.content)
for chunk_idx, file_chunk in enumerate(file_chunks):
file_metadata: dict[str, Any] = base_metadata.copy()
file_metadata.update(file_result.metadata)
file_metadata["data_type"] = str(file_data_type)
file_metadata["file_path"] = file_path
file_metadata["chunk_index"] = chunk_idx
file_metadata["total_chunks"] = len(file_chunks)
if isinstance(arg, dict):
file_metadata.update(arg.get("metadata", {}))
chunk_id = hashlib.sha256(
f"{file_result.doc_id}_{chunk_idx}_{file_chunk}".encode()
).hexdigest()
documents.append(
{
"doc_id": chunk_id,
"content": file_chunk,
"metadata": sanitize_metadata_for_chromadb(
file_metadata
),
}
)
except Exception:
# Silently skip files that can't be processed
continue
else:
metadata: dict[str, Any] = base_metadata.copy()
if data_type in [
DataType.PDF_FILE,
DataType.TEXT_FILE,
DataType.DOCX,
DataType.CSV,
DataType.JSON,
DataType.XML,
DataType.MDX,
]:
if not os.path.isfile(source_ref):
raise FileNotFoundError(f"File does not exist: {source_ref}")
loader = data_type.get_loader()
chunker = data_type.get_chunker()
source_content = SourceContent(source_ref)
loader_result: LoaderResult = loader.load(source_content)
chunks = chunker.chunk(loader_result.content)
for i, chunk in enumerate(chunks):
chunk_metadata: dict[str, Any] = metadata.copy()
chunk_metadata.update(loader_result.metadata)
chunk_metadata["data_type"] = str(data_type)
chunk_metadata["chunk_index"] = i
chunk_metadata["total_chunks"] = len(chunks)
chunk_metadata["source"] = source_ref
if isinstance(arg, dict):
chunk_metadata.update(arg.get("metadata", {}))
chunk_id = hashlib.sha256(
f"{loader_result.doc_id}_{i}_{chunk}".encode()
).hexdigest()
documents.append(
{
"doc_id": chunk_id,
"content": chunk,
"metadata": sanitize_metadata_for_chromadb(chunk_metadata),
}
)
if documents:
self._client.add_documents(
collection_name=self.collection_name, documents=documents
)

View File

@@ -1,434 +0,0 @@
import json
import os
import re
import warnings
from typing import Any, Literal, Optional, Union, cast, get_origin
import requests
from crewai.tools import BaseTool
from pydantic import Field, create_model
def get_enterprise_api_base_url() -> str:
"""Get the enterprise API base URL from environment or use default."""
base_url = os.getenv("CREWAI_PLUS_URL", "https://app.crewai.com")
return f"{base_url}/crewai_plus/api/v1/integrations"
ENTERPRISE_API_BASE_URL = get_enterprise_api_base_url()
class EnterpriseActionTool(BaseTool):
"""A tool that executes a specific enterprise action."""
enterprise_action_token: str = Field(
default="", description="The enterprise action token"
)
action_name: str = Field(default="", description="The name of the action")
action_schema: dict[str, Any] = Field(
default={}, description="The schema of the action"
)
enterprise_api_base_url: str = Field(
default=ENTERPRISE_API_BASE_URL, description="The base API URL"
)
def __init__(
self,
name: str,
description: str,
enterprise_action_token: str,
action_name: str,
action_schema: dict[str, Any],
enterprise_api_base_url: str | None = None,
):
self._model_registry = {}
self._base_name = self._sanitize_name(name)
schema_props, required = self._extract_schema_info(action_schema)
# Define field definitions for the model
field_definitions = {}
for param_name, param_details in schema_props.items():
param_desc = param_details.get("description", "")
is_required = param_name in required
try:
field_type = self._process_schema_type(
param_details, self._sanitize_name(param_name).title()
)
except Exception as e:
print(f"Warning: Could not process schema for {param_name}: {e}")
field_type = str
# Create field definition based on requirement
field_definitions[param_name] = self._create_field_definition(
field_type, is_required, param_desc
)
# Create the model
if field_definitions:
try:
args_schema = create_model(
f"{self._base_name}Schema", **field_definitions
)
except Exception as e:
print(f"Warning: Could not create main schema model: {e}")
args_schema = create_model(
f"{self._base_name}Schema",
input_text=(str, Field(description="Input for the action")),
)
else:
# Fallback for empty schema
args_schema = create_model(
f"{self._base_name}Schema",
input_text=(str, Field(description="Input for the action")),
)
super().__init__(name=name, description=description, args_schema=args_schema)
self.enterprise_action_token = enterprise_action_token
self.action_name = action_name
self.action_schema = action_schema
self.enterprise_api_base_url = (
enterprise_api_base_url or get_enterprise_api_base_url()
)
def _sanitize_name(self, name: str) -> str:
"""Sanitize names to create proper Python class names."""
sanitized = re.sub(r"[^a-zA-Z0-9_]", "", name)
parts = sanitized.split("_")
return "".join(word.capitalize() for word in parts if word)
def _extract_schema_info(
self, action_schema: dict[str, Any]
) -> tuple[dict[str, Any], list[str]]:
"""Extract schema properties and required fields from action schema."""
schema_props = (
action_schema.get("function", {})
.get("parameters", {})
.get("properties", {})
)
required = (
action_schema.get("function", {}).get("parameters", {}).get("required", [])
)
return schema_props, required
def _process_schema_type(self, schema: dict[str, Any], type_name: str) -> type[Any]:
"""Process a JSON schema and return appropriate Python type."""
if "anyOf" in schema:
any_of_types = schema["anyOf"]
is_nullable = any(t.get("type") == "null" for t in any_of_types)
non_null_types = [t for t in any_of_types if t.get("type") != "null"]
if non_null_types:
base_type = self._process_schema_type(non_null_types[0], type_name)
return Optional[base_type] if is_nullable else base_type
return cast(type[Any], Optional[str])
if "oneOf" in schema:
return self._process_schema_type(schema["oneOf"][0], type_name)
if "allOf" in schema:
return self._process_schema_type(schema["allOf"][0], type_name)
json_type = schema.get("type", "string")
if "enum" in schema:
enum_values = schema["enum"]
if not enum_values:
return self._map_json_type_to_python(json_type)
return Literal[tuple(enum_values)] # type: ignore[return-value]
if json_type == "array":
items_schema = schema.get("items", {"type": "string"})
item_type = self._process_schema_type(items_schema, f"{type_name}Item")
return list[item_type]
if json_type == "object":
return self._create_nested_model(schema, type_name)
return self._map_json_type_to_python(json_type)
def _create_nested_model(
self, schema: dict[str, Any], model_name: str
) -> type[Any]:
"""Create a nested Pydantic model for complex objects."""
full_model_name = f"{self._base_name}{model_name}"
if full_model_name in self._model_registry:
return self._model_registry[full_model_name]
properties = schema.get("properties", {})
required_fields = schema.get("required", [])
if not properties:
return dict
field_definitions = {}
for prop_name, prop_schema in properties.items():
prop_desc = prop_schema.get("description", "")
is_required = prop_name in required_fields
try:
prop_type = self._process_schema_type(
prop_schema, f"{model_name}{self._sanitize_name(prop_name).title()}"
)
except Exception as e:
print(f"Warning: Could not process schema for {prop_name}: {e}")
prop_type = str
field_definitions[prop_name] = self._create_field_definition(
prop_type, is_required, prop_desc
)
try:
nested_model = create_model(full_model_name, **field_definitions)
self._model_registry[full_model_name] = nested_model
return nested_model
except Exception as e:
print(f"Warning: Could not create nested model {full_model_name}: {e}")
return dict
def _create_field_definition(
self, field_type: type[Any], is_required: bool, description: str
) -> tuple:
"""Create Pydantic field definition based on type and requirement."""
if is_required:
return (field_type, Field(description=description))
if get_origin(field_type) is Union:
return (field_type, Field(default=None, description=description))
return (
Optional[field_type],
Field(default=None, description=description),
)
def _map_json_type_to_python(self, json_type: str) -> type[Any]:
"""Map basic JSON schema types to Python types."""
type_mapping = {
"string": str,
"integer": int,
"number": float,
"boolean": bool,
"array": list,
"object": dict,
"null": type(None),
}
return type_mapping.get(json_type, str)
def _get_required_nullable_fields(self) -> list[str]:
"""Get a list of required nullable fields from the action schema."""
schema_props, required = self._extract_schema_info(self.action_schema)
required_nullable_fields = []
for param_name in required:
param_details = schema_props.get(param_name, {})
if self._is_nullable_type(param_details):
required_nullable_fields.append(param_name)
return required_nullable_fields
def _is_nullable_type(self, schema: dict[str, Any]) -> bool:
"""Check if a schema represents a nullable type."""
if "anyOf" in schema:
return any(t.get("type") == "null" for t in schema["anyOf"])
return schema.get("type") == "null"
def _run(self, **kwargs) -> str:
"""Execute the specific enterprise action with validated parameters."""
try:
cleaned_kwargs = {}
for key, value in kwargs.items():
if value is not None:
cleaned_kwargs[key] = value
required_nullable_fields = self._get_required_nullable_fields()
for field_name in required_nullable_fields:
if field_name not in cleaned_kwargs:
cleaned_kwargs[field_name] = None
api_url = (
f"{self.enterprise_api_base_url}/actions/{self.action_name}/execute"
)
headers = {
"Authorization": f"Bearer {self.enterprise_action_token}",
"Content-Type": "application/json",
}
payload = cleaned_kwargs
response = requests.post(
url=api_url, headers=headers, json=payload, timeout=60
)
data = response.json()
if not response.ok:
error_message = data.get("error", {}).get("message", json.dumps(data))
return f"API request failed: {error_message}"
return json.dumps(data, indent=2)
except Exception as e:
return f"Error executing action {self.action_name}: {e!s}"
class EnterpriseActionKitToolAdapter:
"""Adapter that creates BaseTool instances for enterprise actions."""
def __init__(
self,
enterprise_action_token: str,
enterprise_api_base_url: str | None = None,
):
"""Initialize the adapter with an enterprise action token."""
self._set_enterprise_action_token(enterprise_action_token)
self._actions_schema = {}
self._tools = None
self.enterprise_api_base_url = (
enterprise_api_base_url or get_enterprise_api_base_url()
)
def tools(self) -> list[BaseTool]:
"""Get the list of tools created from enterprise actions."""
if self._tools is None:
self._fetch_actions()
self._create_tools()
return self._tools or []
def _fetch_actions(self):
"""Fetch available actions from the API."""
try:
actions_url = f"{self.enterprise_api_base_url}/actions"
headers = {"Authorization": f"Bearer {self.enterprise_action_token}"}
response = requests.get(actions_url, headers=headers, timeout=30)
response.raise_for_status()
raw_data = response.json()
if "actions" not in raw_data:
print(f"Unexpected API response structure: {raw_data}")
return
parsed_schema = {}
action_categories = raw_data["actions"]
for action_list in action_categories.values():
if isinstance(action_list, list):
for action in action_list:
action_name = action.get("name")
if action_name:
action_schema = {
"function": {
"name": action_name,
"description": action.get(
"description", f"Execute {action_name}"
),
"parameters": action.get("parameters", {}),
}
}
parsed_schema[action_name] = action_schema
self._actions_schema = parsed_schema
except Exception as e:
print(f"Error fetching actions: {e}")
import traceback
traceback.print_exc()
def _generate_detailed_description(
self, schema: dict[str, Any], indent: int = 0
) -> list[str]:
"""Generate detailed description for nested schema structures."""
descriptions = []
indent_str = " " * indent
schema_type = schema.get("type", "string")
if schema_type == "object":
properties = schema.get("properties", {})
required_fields = schema.get("required", [])
if properties:
descriptions.append(f"{indent_str}Object with properties:")
for prop_name, prop_schema in properties.items():
prop_desc = prop_schema.get("description", "")
is_required = prop_name in required_fields
req_str = " (required)" if is_required else " (optional)"
descriptions.append(
f"{indent_str} - {prop_name}: {prop_desc}{req_str}"
)
if prop_schema.get("type") == "object":
descriptions.extend(
self._generate_detailed_description(prop_schema, indent + 2)
)
elif prop_schema.get("type") == "array":
items_schema = prop_schema.get("items", {})
if items_schema.get("type") == "object":
descriptions.append(f"{indent_str} Array of objects:")
descriptions.extend(
self._generate_detailed_description(
items_schema, indent + 3
)
)
elif "enum" in items_schema:
descriptions.append(
f"{indent_str} Array of enum values: {items_schema['enum']}"
)
elif "enum" in prop_schema:
descriptions.append(
f"{indent_str} Enum values: {prop_schema['enum']}"
)
return descriptions
def _create_tools(self):
"""Create BaseTool instances for each action."""
tools = []
for action_name, action_schema in self._actions_schema.items():
function_details = action_schema.get("function", {})
description = function_details.get("description", f"Execute {action_name}")
parameters = function_details.get("parameters", {})
param_descriptions = []
if parameters.get("properties"):
param_descriptions.append("\nDetailed Parameter Structure:")
param_descriptions.extend(
self._generate_detailed_description(parameters)
)
full_description = description + "\n".join(param_descriptions)
tool = EnterpriseActionTool(
name=action_name.lower().replace(" ", "_"),
description=full_description,
action_name=action_name,
action_schema=action_schema,
enterprise_action_token=self.enterprise_action_token,
enterprise_api_base_url=self.enterprise_api_base_url,
)
tools.append(tool)
self._tools = tools
def _set_enterprise_action_token(self, enterprise_action_token: str | None):
if enterprise_action_token and not enterprise_action_token.startswith("PK_"):
warnings.warn(
"Legacy token detected, please consider using the new Enterprise Action Auth token. Check out our docs for more information https://docs.crewai.com/en/enterprise/features/integrations.",
DeprecationWarning,
stacklevel=2,
)
token = enterprise_action_token or os.environ.get(
"CREWAI_ENTERPRISE_TOOLS_TOKEN"
)
self.enterprise_action_token = token
def __enter__(self):
return self.tools()
def __exit__(self, exc_type, exc_val, exc_tb):
pass

View File

@@ -1,56 +0,0 @@
from collections.abc import Callable
from pathlib import Path
from typing import Any
from crewai_tools.tools.rag.rag_tool import Adapter
from lancedb import DBConnection as LanceDBConnection
from lancedb import connect as lancedb_connect
from lancedb.table import Table as LanceDBTable
from openai import Client as OpenAIClient
from pydantic import Field, PrivateAttr
def _default_embedding_function():
client = OpenAIClient()
def _embedding_function(input):
rs = client.embeddings.create(input=input, model="text-embedding-ada-002")
return [record.embedding for record in rs.data]
return _embedding_function
class LanceDBAdapter(Adapter):
uri: str | Path
table_name: str
embedding_function: Callable = Field(default_factory=_default_embedding_function)
top_k: int = 3
vector_column_name: str = "vector"
text_column_name: str = "text"
_db: LanceDBConnection = PrivateAttr()
_table: LanceDBTable = PrivateAttr()
def model_post_init(self, __context: Any) -> None:
self._db = lancedb_connect(self.uri)
self._table = self._db.open_table(self.table_name)
super().model_post_init(__context)
def query(self, question: str) -> str:
query = self.embedding_function([question])[0]
results = (
self._table.search(query, vector_column_name=self.vector_column_name)
.limit(self.top_k)
.select([self.text_column_name])
.to_list()
)
values = [result[self.text_column_name] for result in results]
return "\n".join(values)
def add(
self,
*args: Any,
**kwargs: Any,
) -> None:
self._table.add(*args, **kwargs)

View File

@@ -1,166 +0,0 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
from crewai.tools import BaseTool
from crewai_tools.adapters.tool_collection import ToolCollection
"""
MCPServer for CrewAI.
"""
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from mcp import StdioServerParameters
from mcpadapt.core import MCPAdapt
from mcpadapt.crewai_adapter import CrewAIAdapter
try:
from mcp import StdioServerParameters
from mcpadapt.core import MCPAdapt
from mcpadapt.crewai_adapter import CrewAIAdapter
MCP_AVAILABLE = True
except ImportError:
MCP_AVAILABLE = False
class MCPServerAdapter:
"""Manages the lifecycle of an MCP server and make its tools available to CrewAI.
Note: tools can only be accessed after the server has been started with the
`start()` method.
Attributes:
tools: The CrewAI tools available from the MCP server.
Usage:
# context manager + stdio
with MCPServerAdapter(...) as tools:
# tools is now available
# context manager + sse
with MCPServerAdapter({"url": "http://localhost:8000/sse"}) as tools:
# tools is now available
# context manager with filtered tools
with MCPServerAdapter(..., "tool1", "tool2") as filtered_tools:
# only tool1 and tool2 are available
# context manager with custom connect timeout (60 seconds)
with MCPServerAdapter(..., connect_timeout=60) as tools:
# tools is now available with longer timeout
# manually stop mcp server
try:
mcp_server = MCPServerAdapter(...)
tools = mcp_server.tools # all tools
# or with filtered tools and custom timeout
mcp_server = MCPServerAdapter(..., "tool1", "tool2", connect_timeout=45)
filtered_tools = mcp_server.tools # only tool1 and tool2
...
finally:
mcp_server.stop()
# Best practice is ensure cleanup is done after use.
mcp_server.stop() # run after crew().kickoff()
"""
def __init__(
self,
serverparams: StdioServerParameters | dict[str, Any],
*tool_names: str,
connect_timeout: int = 30,
):
"""Initialize the MCP Server
Args:
serverparams: The parameters for the MCP server it supports either a
`StdioServerParameters` or a `dict` respectively for STDIO and SSE.
*tool_names: Optional names of tools to filter. If provided, only tools with
matching names will be available.
connect_timeout: Connection timeout in seconds to the MCP server (default is 30s).
"""
super().__init__()
self._adapter = None
self._tools = None
self._tool_names = list(tool_names) if tool_names else None
if not MCP_AVAILABLE:
import click
if click.confirm(
"You are missing the 'mcp' package. Would you like to install it?"
):
import subprocess
try:
subprocess.run(["uv", "add", "mcp crewai-tools[mcp]"], check=True)
except subprocess.CalledProcessError as e:
raise ImportError("Failed to install mcp package") from e
else:
raise ImportError(
"`mcp` package not found, please run `uv add crewai-tools[mcp]`"
)
try:
self._serverparams = serverparams
self._adapter = MCPAdapt(
self._serverparams, CrewAIAdapter(), connect_timeout
)
self.start()
except Exception as e:
if self._adapter is not None:
try:
self.stop()
except Exception as stop_e:
logger.error(f"Error during stop cleanup: {stop_e}")
raise RuntimeError(f"Failed to initialize MCP Adapter: {e}") from e
def start(self):
"""Start the MCP server and initialize the tools."""
self._tools = self._adapter.__enter__()
def stop(self):
"""Stop the MCP server"""
self._adapter.__exit__(None, None, None)
@property
def tools(self) -> ToolCollection[BaseTool]:
"""The CrewAI tools available from the MCP server.
Raises:
ValueError: If the MCP server is not started.
Returns:
The CrewAI tools available from the MCP server.
"""
if self._tools is None:
raise ValueError(
"MCP server not started, run `mcp_server.start()` first before accessing `tools`"
)
tools_collection = ToolCollection(self._tools)
if self._tool_names:
return tools_collection.filter_by_names(self._tool_names)
return tools_collection
def __enter__(self):
"""
Enter the context manager. Note that `__init__()` already starts the MCP server.
So tools should already be available.
"""
return self.tools
def __exit__(self, exc_type, exc_value, traceback):
"""Exit the context manager."""
return self._adapter.__exit__(exc_type, exc_value, traceback)

View File

@@ -1,38 +0,0 @@
from typing import Any
from crewai_tools.rag.core import RAG
from crewai_tools.tools.rag.rag_tool import Adapter
class RAGAdapter(Adapter):
def __init__(
self,
collection_name: str = "crewai_knowledge_base",
persist_directory: str | None = None,
embedding_model: str = "text-embedding-3-small",
top_k: int = 5,
embedding_api_key: str | None = None,
**embedding_kwargs,
):
super().__init__()
# Prepare embedding configuration
embedding_config = {"api_key": embedding_api_key, **embedding_kwargs}
self._adapter = RAG(
collection_name=collection_name,
persist_directory=persist_directory,
embedding_model=embedding_model,
top_k=top_k,
embedding_config=embedding_config,
)
def query(self, question: str) -> str:
return self._adapter.query(question)
def add(
self,
*args: Any,
**kwargs: Any,
) -> None:
self._adapter.add(*args, **kwargs)

View File

@@ -1,77 +0,0 @@
from collections.abc import Callable
from typing import Generic, TypeVar
from crewai.tools import BaseTool
T = TypeVar("T", bound=BaseTool)
class ToolCollection(list, Generic[T]):
"""
A collection of tools that can be accessed by index or name
This class extends the built-in list to provide dictionary-like
access to tools based on their name property.
Usage:
tools = ToolCollection(list_of_tools)
# Access by index (regular list behavior)
first_tool = tools[0]
# Access by name (new functionality)
search_tool = tools["search"]
"""
def __init__(self, tools: list[T] | None = None):
super().__init__(tools or [])
self._name_cache: dict[str, T] = {}
self._build_name_cache()
def _build_name_cache(self) -> None:
self._name_cache = {tool.name.lower(): tool for tool in self}
def __getitem__(self, key: int | str) -> T:
if isinstance(key, str):
return self._name_cache[key.lower()]
return super().__getitem__(key)
def append(self, tool: T) -> None:
super().append(tool)
self._name_cache[tool.name.lower()] = tool
def extend(self, tools: list[T]) -> None:
super().extend(tools)
self._build_name_cache()
def insert(self, index: int, tool: T) -> None:
super().insert(index, tool)
self._name_cache[tool.name.lower()] = tool
def remove(self, tool: T) -> None:
super().remove(tool)
if tool.name.lower() in self._name_cache:
del self._name_cache[tool.name.lower()]
def pop(self, index: int = -1) -> T:
tool = super().pop(index)
if tool.name.lower() in self._name_cache:
del self._name_cache[tool.name.lower()]
return tool
def filter_by_names(self, names: list[str] | None = None) -> "ToolCollection[T]":
if names is None:
return self
return ToolCollection(
[
tool
for name in names
if (tool := self._name_cache.get(name.lower())) is not None
]
)
def filter_where(self, func: Callable[[T], bool]) -> "ToolCollection[T]":
return ToolCollection([tool for tool in self if func(tool)])
def clear(self) -> None:
super().clear()
self._name_cache.clear()

View File

@@ -1,120 +0,0 @@
import logging
import os
import requests
from crewai.tools import BaseTool
from pydantic import Field, create_model
ACTIONS_URL = "https://actions.zapier.com/api/v2/ai-actions"
logger = logging.getLogger(__name__)
class ZapierActionTool(BaseTool):
"""
A tool that wraps a Zapier action
"""
name: str = Field(description="Tool name")
description: str = Field(description="Tool description")
action_id: str = Field(description="Zapier action ID")
api_key: str = Field(description="Zapier API key")
def _run(self, **kwargs) -> str:
"""Execute the Zapier action"""
headers = {"x-api-key": self.api_key, "Content-Type": "application/json"}
instructions = kwargs.pop(
"instructions", "Execute this action with the provided parameters"
)
if not kwargs:
action_params = {"instructions": instructions, "params": {}}
else:
formatted_params = {}
for key, value in kwargs.items():
formatted_params[key] = {
"value": value,
"mode": "guess",
}
action_params = {"instructions": instructions, "params": formatted_params}
execute_url = f"{ACTIONS_URL}/{self.action_id}/execute/"
response = requests.request(
"POST", execute_url, headers=headers, json=action_params, timeout=30
)
response.raise_for_status()
return response.json()
class ZapierActionsAdapter:
"""
Adapter for Zapier Actions
"""
api_key: str
def __init__(self, api_key: str | None = None):
self.api_key = api_key or os.getenv("ZAPIER_API_KEY")
if not self.api_key:
logger.error("Zapier Actions API key is required")
raise ValueError("Zapier Actions API key is required")
def get_zapier_actions(self):
headers = {
"x-api-key": self.api_key,
}
response = requests.request("GET", ACTIONS_URL, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
def tools(self) -> list[BaseTool]:
"""Convert Zapier actions to BaseTool instances"""
actions_response = self.get_zapier_actions()
tools = []
for action in actions_response.get("results", []):
tool_name = (
action["meta"]["action_label"]
.replace(" ", "_")
.replace(":", "")
.lower()
)
params = action.get("params", {})
args_fields = {}
args_fields["instructions"] = (
str,
Field(description="Instructions for how to execute this action"),
)
for param_name, param_info in params.items():
field_type = (
str # Default to string, could be enhanced based on param_info
)
field_description = (
param_info.get("description", "")
if isinstance(param_info, dict)
else ""
)
args_fields[param_name] = (
field_type,
Field(description=field_description),
)
args_schema = create_model(f"{tool_name.title()}Schema", **args_fields)
tool = ZapierActionTool(
name=tool_name,
description=action["description"],
action_id=action["id"],
api_key=self.api_key,
args_schema=args_schema,
)
tools.append(tool)
return tools

View File

@@ -1,16 +0,0 @@
from .bedrock import (
BedrockInvokeAgentTool,
BedrockKBRetrieverTool,
create_browser_toolkit,
create_code_interpreter_toolkit,
)
from .s3 import S3ReaderTool, S3WriterTool
__all__ = [
"BedrockInvokeAgentTool",
"BedrockKBRetrieverTool",
"S3ReaderTool",
"S3WriterTool",
"create_browser_toolkit",
"create_code_interpreter_toolkit",
]

View File

@@ -1,11 +0,0 @@
from .agents.invoke_agent_tool import BedrockInvokeAgentTool
from .browser import create_browser_toolkit
from .code_interpreter import create_code_interpreter_toolkit
from .knowledge_base.retriever_tool import BedrockKBRetrieverTool
__all__ = [
"BedrockInvokeAgentTool",
"BedrockKBRetrieverTool",
"create_browser_toolkit",
"create_code_interpreter_toolkit",
]

View File

@@ -1,181 +0,0 @@
# BedrockInvokeAgentTool
The `BedrockInvokeAgentTool` enables CrewAI agents to invoke Amazon Bedrock Agents and leverage their capabilities within your workflows.
## Installation
```bash
pip install 'crewai[tools]'
```
## Requirements
- AWS credentials configured (either through environment variables or AWS CLI)
- `boto3` and `python-dotenv` packages
- Access to Amazon Bedrock Agents
## Usage
Here's how to use the tool with a CrewAI agent:
```python
from crewai import Agent, Task, Crew
from crewai_tools.aws.bedrock.agents.invoke_agent_tool import BedrockInvokeAgentTool
# Initialize the tool
agent_tool = BedrockInvokeAgentTool(
agent_id="your-agent-id",
agent_alias_id="your-agent-alias-id"
)
# Create a CrewAI agent that uses the tool
aws_expert = Agent(
role='AWS Service Expert',
goal='Help users understand AWS services and quotas',
backstory='I am an expert in AWS services and can provide detailed information about them.',
tools=[agent_tool],
verbose=True
)
# Create a task for the agent
quota_task = Task(
description="Find out the current service quotas for EC2 in us-west-2 and explain any recent changes.",
agent=aws_expert
)
# Create a crew with the agent
crew = Crew(
agents=[aws_expert],
tasks=[quota_task],
verbose=2
)
# Run the crew
result = crew.kickoff()
print(result)
```
## Tool Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| agent_id | str | Yes | None | The unique identifier of the Bedrock agent |
| agent_alias_id | str | Yes | None | The unique identifier of the agent alias |
| session_id | str | No | timestamp | The unique identifier of the session |
| enable_trace | bool | No | False | Whether to enable trace for debugging |
| end_session | bool | No | False | Whether to end the session after invocation |
| description | str | No | None | Custom description for the tool |
## Environment Variables
```bash
BEDROCK_AGENT_ID=your-agent-id # Alternative to passing agent_id
BEDROCK_AGENT_ALIAS_ID=your-agent-alias-id # Alternative to passing agent_alias_id
AWS_REGION=your-aws-region # Defaults to us-west-2
AWS_ACCESS_KEY_ID=your-access-key # Required for AWS authentication
AWS_SECRET_ACCESS_KEY=your-secret-key # Required for AWS authentication
```
## Advanced Usage
### Multi-Agent Workflow with Session Management
```python
from crewai import Agent, Task, Crew, Process
from crewai_tools.aws.bedrock.agents.invoke_agent_tool import BedrockInvokeAgentTool
# Initialize tools with session management
initial_tool = BedrockInvokeAgentTool(
agent_id="your-agent-id",
agent_alias_id="your-agent-alias-id",
session_id="custom-session-id"
)
followup_tool = BedrockInvokeAgentTool(
agent_id="your-agent-id",
agent_alias_id="your-agent-alias-id",
session_id="custom-session-id"
)
final_tool = BedrockInvokeAgentTool(
agent_id="your-agent-id",
agent_alias_id="your-agent-alias-id",
session_id="custom-session-id",
end_session=True
)
# Create agents for different stages
researcher = Agent(
role='AWS Service Researcher',
goal='Gather information about AWS services',
backstory='I am specialized in finding detailed AWS service information.',
tools=[initial_tool]
)
analyst = Agent(
role='Service Compatibility Analyst',
goal='Analyze service compatibility and requirements',
backstory='I analyze AWS services for compatibility and integration possibilities.',
tools=[followup_tool]
)
summarizer = Agent(
role='Technical Documentation Writer',
goal='Create clear technical summaries',
backstory='I specialize in creating clear, concise technical documentation.',
tools=[final_tool]
)
# Create tasks
research_task = Task(
description="Find all available AWS services in us-west-2 region.",
agent=researcher
)
analysis_task = Task(
description="Analyze which services support IPv6 and their implementation requirements.",
agent=analyst
)
summary_task = Task(
description="Create a summary of IPv6-compatible services and their key features.",
agent=summarizer
)
# Create a crew with the agents and tasks
crew = Crew(
agents=[researcher, analyst, summarizer],
tasks=[research_task, analysis_task, summary_task],
process=Process.sequential,
verbose=2
)
# Run the crew
result = crew.kickoff()
```
## Use Cases
### Hybrid Multi-Agent Collaborations
- Create workflows where CrewAI agents collaborate with managed Bedrock agents running as services in AWS
- Enable scenarios where sensitive data processing happens within your AWS environment while other agents operate externally
- Bridge on-premises CrewAI agents with cloud-based Bedrock agents for distributed intelligence workflows
### Data Sovereignty and Compliance
- Keep data-sensitive agentic workflows within your AWS environment while allowing external CrewAI agents to orchestrate tasks
- Maintain compliance with data residency requirements by processing sensitive information only within your AWS account
- Enable secure multi-agent collaborations where some agents cannot access your organization's private data
### Seamless AWS Service Integration
- Access any AWS service through Amazon Bedrock Actions without writing complex integration code
- Enable CrewAI agents to interact with AWS services through natural language requests
- Leverage pre-built Bedrock agent capabilities to interact with AWS services like Bedrock Knowledge Bases, Lambda, and more
### Scalable Hybrid Agent Architectures
- Offload computationally intensive tasks to managed Bedrock agents while lightweight tasks run in CrewAI
- Scale agent processing by distributing workloads between local CrewAI agents and cloud-based Bedrock agents
### Cross-Organizational Agent Collaboration
- Enable secure collaboration between your organization's CrewAI agents and partner organizations' Bedrock agents
- Create workflows where external expertise from Bedrock agents can be incorporated without exposing sensitive data
- Build agent ecosystems that span organizational boundaries while maintaining security and data control

View File

@@ -1,3 +0,0 @@
from .invoke_agent_tool import BedrockInvokeAgentTool
__all__ = ["BedrockInvokeAgentTool"]

View File

@@ -1,184 +0,0 @@
import json
import os
import time
from datetime import datetime, timezone
from typing import ClassVar
from crewai.tools import BaseTool
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from ..exceptions import BedrockAgentError, BedrockValidationError
# Load environment variables from .env file
load_dotenv()
class BedrockInvokeAgentToolInput(BaseModel):
"""Input schema for BedrockInvokeAgentTool."""
query: str = Field(..., description="The query to send to the agent")
class BedrockInvokeAgentTool(BaseTool):
name: str = "Bedrock Agent Invoke Tool"
description: str = "An agent responsible for policy analysis."
args_schema: type[BaseModel] = BedrockInvokeAgentToolInput
agent_id: str = None
agent_alias_id: str = None
session_id: str = None
enable_trace: bool = False
end_session: bool = False
package_dependencies: ClassVar[list[str]] = ["boto3"]
def __init__(
self,
agent_id: str | None = None,
agent_alias_id: str | None = None,
session_id: str | None = None,
enable_trace: bool = False,
end_session: bool = False,
description: str | None = None,
**kwargs,
):
"""Initialize the BedrockInvokeAgentTool with agent configuration.
Args:
agent_id (str): The unique identifier of the Bedrock agent
agent_alias_id (str): The unique identifier of the agent alias
session_id (str): The unique identifier of the session
enable_trace (bool): Whether to enable trace for the agent invocation
end_session (bool): Whether to end the session with the agent
description (Optional[str]): Custom description for the tool
"""
super().__init__(**kwargs)
# Get values from environment variables if not provided
self.agent_id = agent_id or os.getenv("BEDROCK_AGENT_ID")
self.agent_alias_id = agent_alias_id or os.getenv("BEDROCK_AGENT_ALIAS_ID")
self.session_id = session_id or str(
int(time.time())
) # Use timestamp as session ID if not provided
self.enable_trace = enable_trace
self.end_session = end_session
# Update the description if provided
if description:
self.description = description
# Validate parameters
self._validate_parameters()
def _validate_parameters(self):
"""Validate the parameters according to AWS API requirements."""
try:
# Validate agent_id
if not self.agent_id:
raise BedrockValidationError("agent_id cannot be empty")
if not isinstance(self.agent_id, str):
raise BedrockValidationError("agent_id must be a string")
# Validate agent_alias_id
if not self.agent_alias_id:
raise BedrockValidationError("agent_alias_id cannot be empty")
if not isinstance(self.agent_alias_id, str):
raise BedrockValidationError("agent_alias_id must be a string")
# Validate session_id if provided
if self.session_id and not isinstance(self.session_id, str):
raise BedrockValidationError("session_id must be a string")
except BedrockValidationError as e:
raise BedrockValidationError(f"Parameter validation failed: {e!s}") from e
def _run(self, query: str) -> str:
try:
import boto3
from botocore.exceptions import ClientError
except ImportError as e:
raise ImportError(
"`boto3` package not found, please run `uv add boto3`"
) from e
try:
# Initialize the Bedrock Agent Runtime client
bedrock_agent = boto3.client(
"bedrock-agent-runtime",
region_name=os.getenv(
"AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "us-west-2")
),
)
# Format the prompt with current time
current_utc = datetime.now(timezone.utc)
prompt = f"""
The current time is: {current_utc}
Below is the users query or task. Complete it and answer it consicely and to the point:
{query}
"""
# Invoke the agent
response = bedrock_agent.invoke_agent(
agentId=self.agent_id,
agentAliasId=self.agent_alias_id,
sessionId=self.session_id,
inputText=prompt,
enableTrace=self.enable_trace,
endSession=self.end_session,
)
# Process the response
completion = ""
# Check if response contains a completion field
if "completion" in response:
# Process streaming response format
for event in response.get("completion", []):
if "chunk" in event and "bytes" in event["chunk"]:
chunk_bytes = event["chunk"]["bytes"]
if isinstance(chunk_bytes, (bytes, bytearray)):
completion += chunk_bytes.decode("utf-8")
else:
completion += str(chunk_bytes)
# If no completion found in streaming format, try direct format
if not completion and "chunk" in response and "bytes" in response["chunk"]:
chunk_bytes = response["chunk"]["bytes"]
if isinstance(chunk_bytes, (bytes, bytearray)):
completion = chunk_bytes.decode("utf-8")
else:
completion = str(chunk_bytes)
# If still no completion, return debug info
if not completion:
debug_info = {
"error": "Could not extract completion from response",
"response_keys": list(response.keys()),
}
# Add more debug info
if "chunk" in response:
debug_info["chunk_keys"] = list(response["chunk"].keys())
raise BedrockAgentError(
f"Failed to extract completion: {json.dumps(debug_info, indent=2)}"
)
return completion
except ClientError as e:
error_code = "Unknown"
error_message = str(e)
# Try to extract error code if available
if hasattr(e, "response") and "Error" in e.response:
error_code = e.response["Error"].get("Code", "Unknown")
error_message = e.response["Error"].get("Message", str(e))
raise BedrockAgentError(f"Error ({error_code}): {error_message}") from e
except BedrockAgentError:
# Re-raise BedrockAgentError exceptions
raise
except Exception as e:
raise BedrockAgentError(f"Unexpected error: {e!s}") from e

View File

@@ -1,158 +0,0 @@
# AWS Bedrock Browser Tools
This toolkit provides a set of tools for interacting with web browsers through AWS Bedrock Browser. It enables your CrewAI agents to navigate websites, extract content, click elements, and more.
## Features
- Navigate to URLs and browse the web
- Extract text and hyperlinks from pages
- Click on elements using CSS selectors
- Navigate back through browser history
- Get information about the current webpage
- Multiple browser sessions with thread-based isolation
## Installation
Ensure you have the necessary dependencies:
```bash
uv add crewai-tools bedrock-agentcore beautifulsoup4 playwright nest-asyncio
```
## Usage
### Basic Usage
```python
from crewai import Agent, Task, Crew, LLM
from crewai_tools.aws.bedrock.browser import create_browser_toolkit
# Create the browser toolkit
toolkit, browser_tools = create_browser_toolkit(region="us-west-2")
# Create the Bedrock LLM
llm = LLM(
model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
region_name="us-west-2",
)
# Create a CrewAI agent that uses the browser tools
research_agent = Agent(
role="Web Researcher",
goal="Research and summarize web content",
backstory="You're an expert at finding information online.",
tools=browser_tools,
llm=llm
)
# Create a task for the agent
research_task = Task(
description="Navigate to https://example.com and extract all text content. Summarize the main points.",
expected_output="A list of bullet points containing the most important information on https://example.com. Plus, a description of the tool calls used, and actions performed to get to the page.",
agent=research_agent
)
# Create and run the crew
crew = Crew(
agents=[research_agent],
tasks=[research_task]
)
result = crew.kickoff()
print(f"\n***Final result:***\n\n{result}")
# Clean up browser resources when done
toolkit.sync_cleanup()
```
### Available Tools
The toolkit provides the following tools:
1. `navigate_browser` - Navigate to a URL
2. `click_element` - Click on an element using CSS selectors
3. `extract_text` - Extract all text from the current webpage
4. `extract_hyperlinks` - Extract all hyperlinks from the current webpage
5. `get_elements` - Get elements matching a CSS selector
6. `navigate_back` - Navigate to the previous page
7. `current_webpage` - Get information about the current webpage
### Advanced Usage (with async)
```python
import asyncio
from crewai import Agent, Task, Crew, LLM
from crewai_tools.aws.bedrock.browser import create_browser_toolkit
async def main():
# Create the browser toolkit with specific AWS region
toolkit, browser_tools = create_browser_toolkit(region="us-west-2")
tools_by_name = toolkit.get_tools_by_name()
# Create the Bedrock LLM
llm = LLM(
model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
region_name="us-west-2",
)
# Create agents with specific tools
navigator_agent = Agent(
role="Navigator",
goal="Find specific information across websites",
backstory="You navigate through websites to locate information.",
tools=[
tools_by_name["navigate_browser"],
tools_by_name["click_element"],
tools_by_name["navigate_back"]
],
llm=llm
)
content_agent = Agent(
role="Content Extractor",
goal="Extract and analyze webpage content",
backstory="You extract and analyze content from webpages.",
tools=[
tools_by_name["extract_text"],
tools_by_name["extract_hyperlinks"],
tools_by_name["get_elements"]
],
llm=llm
)
# Create tasks for the agents
navigation_task = Task(
description="Navigate to https://example.com, then click on the the 'More information...' link.",
expected_output="The status of the tool calls for this task.",
agent=navigator_agent,
)
extraction_task = Task(
description="Extract all text from the current page and summarize it.",
expected_output="The summary of the page, and a description of the tool calls used, and actions performed to get to the page.",
agent=content_agent,
)
# Create and run the crew
crew = Crew(
agents=[navigator_agent, content_agent],
tasks=[navigation_task, extraction_task]
)
result = await crew.kickoff_async()
# Clean up browser resources when done
toolkit.sync_cleanup()
return result
if __name__ == "__main__":
result = asyncio.run(main())
print(f"\n***Final result:***\n\n{result}")
```
## Requirements
- AWS account with access to Bedrock AgentCore API
- Properly configured AWS credentials

View File

@@ -1,3 +0,0 @@
from .browser_toolkit import BrowserToolkit, create_browser_toolkit
__all__ = ["BrowserToolkit", "create_browser_toolkit"]

View File

@@ -1,262 +0,0 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from bedrock_agentcore.tools.browser_client import BrowserClient
from playwright.async_api import Browser as AsyncBrowser
from playwright.sync_api import Browser as SyncBrowser
logger = logging.getLogger(__name__)
class BrowserSessionManager:
"""
Manages browser sessions for different threads.
This class maintains separate browser sessions for different threads,
enabling concurrent usage of browsers in multi-threaded environments.
Browsers are created lazily only when needed by tools.
"""
def __init__(self, region: str = "us-west-2"):
"""
Initialize the browser session manager.
Args:
region: AWS region for browser client
"""
self.region = region
self._async_sessions: dict[str, tuple[BrowserClient, AsyncBrowser]] = {}
self._sync_sessions: dict[str, tuple[BrowserClient, SyncBrowser]] = {}
async def get_async_browser(self, thread_id: str) -> AsyncBrowser:
"""
Get or create an async browser for the specified thread.
Args:
thread_id: Unique identifier for the thread requesting the browser
Returns:
An async browser instance specific to the thread
"""
if thread_id in self._async_sessions:
return self._async_sessions[thread_id][1]
return await self._create_async_browser_session(thread_id)
def get_sync_browser(self, thread_id: str) -> SyncBrowser:
"""
Get or create a sync browser for the specified thread.
Args:
thread_id: Unique identifier for the thread requesting the browser
Returns:
A sync browser instance specific to the thread
"""
if thread_id in self._sync_sessions:
return self._sync_sessions[thread_id][1]
return self._create_sync_browser_session(thread_id)
async def _create_async_browser_session(self, thread_id: str) -> AsyncBrowser:
"""
Create a new async browser session for the specified thread.
Args:
thread_id: Unique identifier for the thread
Returns:
The newly created async browser instance
Raises:
Exception: If browser session creation fails
"""
from bedrock_agentcore.tools.browser_client import BrowserClient
browser_client = BrowserClient(region=self.region)
try:
# Start browser session
browser_client.start()
# Get WebSocket connection info
ws_url, headers = browser_client.generate_ws_headers()
logger.info(
f"Connecting to async WebSocket endpoint for thread {thread_id}: {ws_url}"
)
from playwright.async_api import async_playwright
# Connect to browser using Playwright
playwright = await async_playwright().start()
browser = await playwright.chromium.connect_over_cdp(
endpoint_url=ws_url, headers=headers, timeout=30000
)
logger.info(
f"Successfully connected to async browser for thread {thread_id}"
)
# Store session resources
self._async_sessions[thread_id] = (browser_client, browser)
return browser
except Exception as e:
logger.error(
f"Failed to create async browser session for thread {thread_id}: {e}"
)
# Clean up resources if session creation fails
if browser_client:
try:
browser_client.stop()
except Exception as cleanup_error:
logger.warning(f"Error cleaning up browser client: {cleanup_error}")
raise
def _create_sync_browser_session(self, thread_id: str) -> SyncBrowser:
"""
Create a new sync browser session for the specified thread.
Args:
thread_id: Unique identifier for the thread
Returns:
The newly created sync browser instance
Raises:
Exception: If browser session creation fails
"""
from bedrock_agentcore.tools.browser_client import BrowserClient
browser_client = BrowserClient(region=self.region)
try:
# Start browser session
browser_client.start()
# Get WebSocket connection info
ws_url, headers = browser_client.generate_ws_headers()
logger.info(
f"Connecting to sync WebSocket endpoint for thread {thread_id}: {ws_url}"
)
from playwright.sync_api import sync_playwright
# Connect to browser using Playwright
playwright = sync_playwright().start()
browser = playwright.chromium.connect_over_cdp(
endpoint_url=ws_url, headers=headers, timeout=30000
)
logger.info(
f"Successfully connected to sync browser for thread {thread_id}"
)
# Store session resources
self._sync_sessions[thread_id] = (browser_client, browser)
return browser
except Exception as e:
logger.error(
f"Failed to create sync browser session for thread {thread_id}: {e}"
)
# Clean up resources if session creation fails
if browser_client:
try:
browser_client.stop()
except Exception as cleanup_error:
logger.warning(f"Error cleaning up browser client: {cleanup_error}")
raise
async def close_async_browser(self, thread_id: str) -> None:
"""
Close the async browser session for the specified thread.
Args:
thread_id: Unique identifier for the thread
"""
if thread_id not in self._async_sessions:
logger.warning(f"No async browser session found for thread {thread_id}")
return
browser_client, browser = self._async_sessions[thread_id]
# Close browser
if browser:
try:
await browser.close()
except Exception as e:
logger.warning(
f"Error closing async browser for thread {thread_id}: {e}"
)
# Stop browser client
if browser_client:
try:
browser_client.stop()
except Exception as e:
logger.warning(
f"Error stopping browser client for thread {thread_id}: {e}"
)
# Remove session from dictionary
del self._async_sessions[thread_id]
logger.info(f"Async browser session cleaned up for thread {thread_id}")
def close_sync_browser(self, thread_id: str) -> None:
"""
Close the sync browser session for the specified thread.
Args:
thread_id: Unique identifier for the thread
"""
if thread_id not in self._sync_sessions:
logger.warning(f"No sync browser session found for thread {thread_id}")
return
browser_client, browser = self._sync_sessions[thread_id]
# Close browser
if browser:
try:
browser.close()
except Exception as e:
logger.warning(
f"Error closing sync browser for thread {thread_id}: {e}"
)
# Stop browser client
if browser_client:
try:
browser_client.stop()
except Exception as e:
logger.warning(
f"Error stopping browser client for thread {thread_id}: {e}"
)
# Remove session from dictionary
del self._sync_sessions[thread_id]
logger.info(f"Sync browser session cleaned up for thread {thread_id}")
async def close_all_browsers(self) -> None:
"""Close all browser sessions."""
# Close all async browsers
async_thread_ids = list(self._async_sessions.keys())
for thread_id in async_thread_ids:
await self.close_async_browser(thread_id)
# Close all sync browsers
sync_thread_ids = list(self._sync_sessions.keys())
for thread_id in sync_thread_ids:
self.close_sync_browser(thread_id)
logger.info("All browser sessions closed")

View File

@@ -1,615 +0,0 @@
"""Toolkit for navigating web with AWS browser."""
import asyncio
import json
import logging
from typing import Any
from urllib.parse import urlparse
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from .browser_session_manager import BrowserSessionManager
from .utils import aget_current_page, get_current_page
logger = logging.getLogger(__name__)
# Input schemas
class NavigateToolInput(BaseModel):
"""Input for NavigateTool."""
url: str = Field(description="URL to navigate to")
thread_id: str = Field(
default="default", description="Thread ID for the browser session"
)
class ClickToolInput(BaseModel):
"""Input for ClickTool."""
selector: str = Field(description="CSS selector for the element to click on")
thread_id: str = Field(
default="default", description="Thread ID for the browser session"
)
class GetElementsToolInput(BaseModel):
"""Input for GetElementsTool."""
selector: str = Field(description="CSS selector for elements to get")
thread_id: str = Field(
default="default", description="Thread ID for the browser session"
)
class ExtractTextToolInput(BaseModel):
"""Input for ExtractTextTool."""
thread_id: str = Field(
default="default", description="Thread ID for the browser session"
)
class ExtractHyperlinksToolInput(BaseModel):
"""Input for ExtractHyperlinksTool."""
thread_id: str = Field(
default="default", description="Thread ID for the browser session"
)
class NavigateBackToolInput(BaseModel):
"""Input for NavigateBackTool."""
thread_id: str = Field(
default="default", description="Thread ID for the browser session"
)
class CurrentWebPageToolInput(BaseModel):
"""Input for CurrentWebPageTool."""
thread_id: str = Field(
default="default", description="Thread ID for the browser session"
)
# Base tool class
class BrowserBaseTool(BaseTool):
"""Base class for browser tools."""
def __init__(self, session_manager: BrowserSessionManager):
"""Initialize with a session manager."""
super().__init__()
self._session_manager = session_manager
if self._is_in_asyncio_loop() and hasattr(self, "_arun"):
self._original_run = self._run
# Override _run to use _arun when in an asyncio loop
def patched_run(*args, **kwargs):
try:
import nest_asyncio
loop = asyncio.get_event_loop()
nest_asyncio.apply(loop)
return asyncio.get_event_loop().run_until_complete(
self._arun(*args, **kwargs)
)
except Exception as e:
return f"Error in patched _run: {e!s}"
self._run = patched_run
async def get_async_page(self, thread_id: str) -> Any:
"""Get or create a page for the specified thread."""
browser = await self._session_manager.get_async_browser(thread_id)
return await aget_current_page(browser)
def get_sync_page(self, thread_id: str) -> Any:
"""Get or create a page for the specified thread."""
browser = self._session_manager.get_sync_browser(thread_id)
return get_current_page(browser)
def _is_in_asyncio_loop(self) -> bool:
"""Check if we're currently in an asyncio event loop."""
try:
loop = asyncio.get_event_loop()
return loop.is_running()
except RuntimeError:
return False
# Tool classes
class NavigateTool(BrowserBaseTool):
"""Tool for navigating a browser to a URL."""
name: str = "navigate_browser"
description: str = "Navigate a browser to the specified URL"
args_schema: type[BaseModel] = NavigateToolInput
def _run(self, url: str, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Get page for this thread
page = self.get_sync_page(thread_id)
# Validate URL scheme
parsed_url = urlparse(url)
if parsed_url.scheme not in ("http", "https"):
raise ValueError("URL scheme must be 'http' or 'https'")
# Navigate to URL
response = page.goto(url)
status = response.status if response else "unknown"
return f"Navigating to {url} returned status code {status}"
except Exception as e:
return f"Error navigating to {url}: {e!s}"
async def _arun(self, url: str, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Get page for this thread
page = await self.get_async_page(thread_id)
# Validate URL scheme
parsed_url = urlparse(url)
if parsed_url.scheme not in ("http", "https"):
raise ValueError("URL scheme must be 'http' or 'https'")
# Navigate to URL
response = await page.goto(url)
status = response.status if response else "unknown"
return f"Navigating to {url} returned status code {status}"
except Exception as e:
return f"Error navigating to {url}: {e!s}"
class ClickTool(BrowserBaseTool):
"""Tool for clicking on an element with the given CSS selector."""
name: str = "click_element"
description: str = "Click on an element with the given CSS selector"
args_schema: type[BaseModel] = ClickToolInput
visible_only: bool = True
"""Whether to consider only visible elements."""
playwright_strict: bool = False
"""Whether to employ Playwright's strict mode when clicking on elements."""
playwright_timeout: float = 1_000
"""Timeout (in ms) for Playwright to wait for element to be ready."""
def _selector_effective(self, selector: str) -> str:
if not self.visible_only:
return selector
return f"{selector} >> visible=1"
def _run(self, selector: str, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Get the current page
page = self.get_sync_page(thread_id)
# Click on the element
selector_effective = self._selector_effective(selector=selector)
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
try:
page.click(
selector_effective,
strict=self.playwright_strict,
timeout=self.playwright_timeout,
)
except PlaywrightTimeoutError:
return f"Unable to click on element '{selector}'"
except Exception as click_error:
return f"Unable to click on element '{selector}': {click_error!s}"
return f"Clicked element '{selector}'"
except Exception as e:
return f"Error clicking on element: {e!s}"
async def _arun(self, selector: str, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Get the current page
page = await self.get_async_page(thread_id)
# Click on the element
selector_effective = self._selector_effective(selector=selector)
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
try:
await page.click(
selector_effective,
strict=self.playwright_strict,
timeout=self.playwright_timeout,
)
except PlaywrightTimeoutError:
return f"Unable to click on element '{selector}'"
except Exception as click_error:
return f"Unable to click on element '{selector}': {click_error!s}"
return f"Clicked element '{selector}'"
except Exception as e:
return f"Error clicking on element: {e!s}"
class NavigateBackTool(BrowserBaseTool):
"""Tool for navigating back in browser history."""
name: str = "navigate_back"
description: str = "Navigate back to the previous page"
args_schema: type[BaseModel] = NavigateBackToolInput
def _run(self, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Get the current page
page = self.get_sync_page(thread_id)
# Navigate back
try:
page.go_back()
return "Navigated back to the previous page"
except Exception as nav_error:
return f"Unable to navigate back: {nav_error!s}"
except Exception as e:
return f"Error navigating back: {e!s}"
async def _arun(self, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Get the current page
page = await self.get_async_page(thread_id)
# Navigate back
try:
await page.go_back()
return "Navigated back to the previous page"
except Exception as nav_error:
return f"Unable to navigate back: {nav_error!s}"
except Exception as e:
return f"Error navigating back: {e!s}"
class ExtractTextTool(BrowserBaseTool):
"""Tool for extracting text from a webpage."""
name: str = "extract_text"
description: str = "Extract all the text on the current webpage"
args_schema: type[BaseModel] = ExtractTextToolInput
def _run(self, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Import BeautifulSoup
try:
from bs4 import BeautifulSoup
except ImportError:
return (
"The 'beautifulsoup4' package is required to use this tool."
" Please install it with 'pip install beautifulsoup4'."
)
# Get the current page
page = self.get_sync_page(thread_id)
# Extract text
content = page.content()
soup = BeautifulSoup(content, "html.parser")
return soup.get_text(separator="\n").strip()
except Exception as e:
return f"Error extracting text: {e!s}"
async def _arun(self, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Import BeautifulSoup
try:
from bs4 import BeautifulSoup
except ImportError:
return (
"The 'beautifulsoup4' package is required to use this tool."
" Please install it with 'pip install beautifulsoup4'."
)
# Get the current page
page = await self.get_async_page(thread_id)
# Extract text
content = await page.content()
soup = BeautifulSoup(content, "html.parser")
return soup.get_text(separator="\n").strip()
except Exception as e:
return f"Error extracting text: {e!s}"
class ExtractHyperlinksTool(BrowserBaseTool):
"""Tool for extracting hyperlinks from a webpage."""
name: str = "extract_hyperlinks"
description: str = "Extract all hyperlinks on the current webpage"
args_schema: type[BaseModel] = ExtractHyperlinksToolInput
def _run(self, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Import BeautifulSoup
try:
from bs4 import BeautifulSoup
except ImportError:
return (
"The 'beautifulsoup4' package is required to use this tool."
" Please install it with 'pip install beautifulsoup4'."
)
# Get the current page
page = self.get_sync_page(thread_id)
# Extract hyperlinks
content = page.content()
soup = BeautifulSoup(content, "html.parser")
links = []
for link in soup.find_all("a", href=True):
text = link.get_text().strip()
href = link["href"]
if href.startswith(("http", "https")):
links.append({"text": text, "url": href})
if not links:
return "No hyperlinks found on the current page."
return json.dumps(links, indent=2)
except Exception as e:
return f"Error extracting hyperlinks: {e!s}"
async def _arun(self, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Import BeautifulSoup
try:
from bs4 import BeautifulSoup
except ImportError:
return (
"The 'beautifulsoup4' package is required to use this tool."
" Please install it with 'pip install beautifulsoup4'."
)
# Get the current page
page = await self.get_async_page(thread_id)
# Extract hyperlinks
content = await page.content()
soup = BeautifulSoup(content, "html.parser")
links = []
for link in soup.find_all("a", href=True):
text = link.get_text().strip()
href = link["href"]
if href.startswith(("http", "https")):
links.append({"text": text, "url": href})
if not links:
return "No hyperlinks found on the current page."
return json.dumps(links, indent=2)
except Exception as e:
return f"Error extracting hyperlinks: {e!s}"
class GetElementsTool(BrowserBaseTool):
"""Tool for getting elements from a webpage."""
name: str = "get_elements"
description: str = "Get elements from the webpage using a CSS selector"
args_schema: type[BaseModel] = GetElementsToolInput
def _run(self, selector: str, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Get the current page
page = self.get_sync_page(thread_id)
# Get elements
elements = page.query_selector_all(selector)
if not elements:
return f"No elements found with selector '{selector}'"
elements_text = []
for i, element in enumerate(elements):
text = element.text_content()
elements_text.append(f"Element {i + 1}: {text.strip()}")
return "\n".join(elements_text)
except Exception as e:
return f"Error getting elements: {e!s}"
async def _arun(self, selector: str, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Get the current page
page = await self.get_async_page(thread_id)
# Get elements
elements = await page.query_selector_all(selector)
if not elements:
return f"No elements found with selector '{selector}'"
elements_text = []
for i, element in enumerate(elements):
text = await element.text_content()
elements_text.append(f"Element {i + 1}: {text.strip()}")
return "\n".join(elements_text)
except Exception as e:
return f"Error getting elements: {e!s}"
class CurrentWebPageTool(BrowserBaseTool):
"""Tool for getting information about the current webpage."""
name: str = "current_webpage"
description: str = "Get information about the current webpage"
args_schema: type[BaseModel] = CurrentWebPageToolInput
def _run(self, thread_id: str = "default", **kwargs) -> str:
"""Use the sync tool."""
try:
# Get the current page
page = self.get_sync_page(thread_id)
# Get information
url = page.url
title = page.title()
return f"URL: {url}\nTitle: {title}"
except Exception as e:
return f"Error getting current webpage info: {e!s}"
async def _arun(self, thread_id: str = "default", **kwargs) -> str:
"""Use the async tool."""
try:
# Get the current page
page = await self.get_async_page(thread_id)
# Get information
url = page.url
title = await page.title()
return f"URL: {url}\nTitle: {title}"
except Exception as e:
return f"Error getting current webpage info: {e!s}"
class BrowserToolkit:
"""Toolkit for navigating web with AWS Bedrock browser.
This toolkit provides a set of tools for working with a remote browser
and supports multiple threads by maintaining separate browser sessions
for each thread ID. Browsers are created lazily only when needed.
Example:
```python
from crewai import Agent, Task, Crew
from crewai_tools.aws.bedrock.browser import create_browser_toolkit
# Create the browser toolkit
toolkit, browser_tools = create_browser_toolkit(region="us-west-2")
# Create a CrewAI agent that uses the browser tools
research_agent = Agent(
role="Web Researcher",
goal="Research and summarize web content",
backstory="You're an expert at finding information online.",
tools=browser_tools
)
# Create a task for the agent
research_task = Task(
description="Navigate to https://example.com and extract all text content. Summarize the main points.",
agent=research_agent
)
# Create and run the crew
crew = Crew(
agents=[research_agent],
tasks=[research_task]
)
result = crew.kickoff()
# Clean up browser resources when done
import asyncio
asyncio.run(toolkit.cleanup())
```
"""
def __init__(self, region: str = "us-west-2"):
"""
Initialize the toolkit
Args:
region: AWS region for the browser client
"""
self.region = region
self.session_manager = BrowserSessionManager(region=region)
self.tools: list[BaseTool] = []
self._nest_current_loop()
self._setup_tools()
def _nest_current_loop(self):
"""Apply nest_asyncio if we're in an asyncio loop."""
try:
loop = asyncio.get_event_loop()
if loop.is_running():
try:
import nest_asyncio
nest_asyncio.apply(loop)
except Exception as e:
logger.warning(f"Failed to apply nest_asyncio: {e!s}")
except RuntimeError:
pass
def _setup_tools(self) -> None:
"""Initialize tools without creating any browsers."""
self.tools = [
NavigateTool(session_manager=self.session_manager),
ClickTool(session_manager=self.session_manager),
NavigateBackTool(session_manager=self.session_manager),
ExtractTextTool(session_manager=self.session_manager),
ExtractHyperlinksTool(session_manager=self.session_manager),
GetElementsTool(session_manager=self.session_manager),
CurrentWebPageTool(session_manager=self.session_manager),
]
def get_tools(self) -> list[BaseTool]:
"""
Get the list of browser tools
Returns:
List of CrewAI tools
"""
return self.tools
def get_tools_by_name(self) -> dict[str, BaseTool]:
"""
Get a dictionary of tools mapped by their names
Returns:
Dictionary of {tool_name: tool}
"""
return {tool.name: tool for tool in self.tools}
async def cleanup(self) -> None:
"""Clean up all browser sessions asynchronously"""
await self.session_manager.close_all_browsers()
logger.info("All browser sessions cleaned up")
def sync_cleanup(self) -> None:
"""Clean up all browser sessions from synchronous code"""
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(self.cleanup())
else:
loop.run_until_complete(self.cleanup())
except RuntimeError:
asyncio.run(self.cleanup())
def create_browser_toolkit(
region: str = "us-west-2",
) -> tuple[BrowserToolkit, list[BaseTool]]:
"""
Create a BrowserToolkit
Args:
region: AWS region for browser client
Returns:
Tuple of (toolkit, tools)
"""
toolkit = BrowserToolkit(region=region)
tools = toolkit.get_tools()
return toolkit, tools

View File

@@ -1,43 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from playwright.async_api import Browser as AsyncBrowser
from playwright.async_api import Page as AsyncPage
from playwright.sync_api import Browser as SyncBrowser
from playwright.sync_api import Page as SyncPage
async def aget_current_page(browser: AsyncBrowser | Any) -> AsyncPage:
"""
Asynchronously get the current page of the browser.
Args:
browser: The browser (AsyncBrowser) to get the current page from.
Returns:
AsyncPage: The current page.
"""
if not browser.contexts:
context = await browser.new_context()
return await context.new_page()
context = browser.contexts[0]
if not context.pages:
return await context.new_page()
return context.pages[-1]
def get_current_page(browser: SyncBrowser | Any) -> SyncPage:
"""
Get the current page of the browser.
Args:
browser: The browser to get the current page from.
Returns:
SyncPage: The current page.
"""
if not browser.contexts:
context = browser.new_context()
return context.new_page()
context = browser.contexts[0]
if not context.pages:
return context.new_page()
return context.pages[-1]

View File

@@ -1,217 +0,0 @@
# AWS Bedrock Code Interpreter Tools
This toolkit provides a set of tools for interacting with the AWS Bedrock Code Interpreter environment. It enables your CrewAI agents to execute code, run shell commands, manage files, and perform computational tasks in a secure, isolated environment.
## Features
- Execute code in various languages (primarily Python)
- Run shell commands in the environment
- Read, write, list, and delete files
- Manage long-running tasks asynchronously
- Multiple code interpreter sessions with thread-based isolation
## Installation
Ensure you have the necessary dependencies:
```bash
uv add crewai-tools bedrock-agentcore
```
## Usage
### Basic Usage
```python
from crewai import Agent, Task, Crew, LLM
from crewai_tools.aws import create_code_interpreter_toolkit
# Create the code interpreter toolkit
toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2")
# Create the Bedrock LLM
llm = LLM(
model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
region_name="us-west-2",
)
# Create a CrewAI agent that uses the code interpreter tools
developer_agent = Agent(
role="Python Developer",
goal="Create and execute Python code to solve problems.",
backstory="You're a skilled Python developer with expertise in data analysis.",
tools=code_tools,
llm=llm
)
# Create a task for the agent
coding_task = Task(
description="Write a Python function that calculates the factorial of a number and test it. Do not use any imports from outside the Python standard library.",
expected_output="The Python function created, and the test results.",
agent=developer_agent
)
# Create and run the crew
crew = Crew(
agents=[developer_agent],
tasks=[coding_task]
)
result = crew.kickoff()
print(f"\n***Final result:***\n\n{result}")
# Clean up resources when done
import asyncio
asyncio.run(toolkit.cleanup())
```
### Available Tools
The toolkit provides the following tools:
1. `execute_code` - Run code in various languages (primarily Python)
2. `execute_command` - Run shell commands in the environment
3. `read_files` - Read content of files in the environment
4. `list_files` - List files in directories
5. `delete_files` - Remove files from the environment
6. `write_files` - Create or update files
7. `start_command_execution` - Start long-running commands asynchronously
8. `get_task` - Check status of async tasks
9. `stop_task` - Stop running tasks
### Advanced Usage
```python
from crewai import Agent, Task, Crew, LLM
from crewai_tools.aws import create_code_interpreter_toolkit
# Create the code interpreter toolkit
toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2")
tools_by_name = toolkit.get_tools_by_name()
# Create the Bedrock LLM
llm = LLM(
model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
region_name="us-west-2",
)
# Create agents with specific tools
code_agent = Agent(
role="Code Developer",
goal="Write and execute code",
backstory="You write and test code to solve complex problems.",
tools=[
# Use specific tools by name
tools_by_name["execute_code"],
tools_by_name["execute_command"],
tools_by_name["read_files"],
tools_by_name["write_files"]
],
llm=llm
)
file_agent = Agent(
role="File Manager",
goal="Manage files in the environment",
backstory="You help organize and manage files in the code environment.",
tools=[
# Use specific tools by name
tools_by_name["list_files"],
tools_by_name["read_files"],
tools_by_name["write_files"],
tools_by_name["delete_files"]
],
llm=llm
)
# Create tasks for the agents
coding_task = Task(
description="Write a Python script to analyze data from a CSV file. Do not use any imports from outside the Python standard library.",
expected_output="The Python function created.",
agent=code_agent
)
file_task = Task(
description="Organize the created files into separate directories.",
agent=file_agent
)
# Create and run the crew
crew = Crew(
agents=[code_agent, file_agent],
tasks=[coding_task, file_task]
)
result = crew.kickoff()
print(f"\n***Final result:***\n\n{result}")
# Clean up code interpreter resources when done
import asyncio
asyncio.run(toolkit.cleanup())
```
### Example: Data Analysis with Python
```python
from crewai import Agent, Task, Crew, LLM
from crewai_tools.aws import create_code_interpreter_toolkit
# Create toolkit and tools
toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2")
# Create the Bedrock LLM
llm = LLM(
model="bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
region_name="us-west-2",
)
# Create a data analyst agent
analyst_agent = Agent(
role="Data Analyst",
goal="Analyze data using Python",
backstory="You're an expert data analyst who uses Python for data processing.",
tools=code_tools,
llm=llm
)
# Create a task for the agent
analysis_task = Task(
description="""
For all of the below, do not use any imports from outside the Python standard library.
1. Create a sample dataset with random data
2. Perform statistical analysis on the dataset
3. Generate visualizations of the results
4. Save the results and visualizations to files
""",
agent=analyst_agent
)
# Create and run the crew
crew = Crew(
agents=[analyst_agent],
tasks=[analysis_task]
)
result = crew.kickoff()
print(f"\n***Final result:***\n\n{result}")
# Clean up resources
import asyncio
asyncio.run(toolkit.cleanup())
```
## Resource Cleanup
Always clean up code interpreter resources when done to prevent resource leaks:
```python
import asyncio
# Clean up all code interpreter sessions
asyncio.run(toolkit.cleanup())
```
## Requirements
- AWS account with access to Bedrock AgentCore API
- Properly configured AWS credentials

View File

@@ -1,6 +0,0 @@
from .code_interpreter_toolkit import (
CodeInterpreterToolkit,
create_code_interpreter_toolkit,
)
__all__ = ["CodeInterpreterToolkit", "create_code_interpreter_toolkit"]

View File

@@ -1,629 +0,0 @@
"""Toolkit for working with AWS Bedrock Code Interpreter."""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING, Any
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
if TYPE_CHECKING:
from bedrock_agentcore.tools.code_interpreter_client import CodeInterpreter
logger = logging.getLogger(__name__)
def extract_output_from_stream(response):
"""
Extract output from code interpreter response stream
Args:
response: Response from code interpreter execution
Returns:
Extracted output as string
"""
output = []
for event in response["stream"]:
if "result" in event:
result = event["result"]
for content_item in result["content"]:
if content_item["type"] == "text":
output.append(content_item["text"])
if content_item["type"] == "resource":
resource = content_item["resource"]
if "text" in resource:
file_path = resource["uri"].replace("file://", "")
file_content = resource["text"]
output.append(f"==== File: {file_path} ====\n{file_content}\n")
else:
output.append(json.dumps(resource))
return "\n".join(output)
# Input schemas
class ExecuteCodeInput(BaseModel):
"""Input for ExecuteCode."""
code: str = Field(description="The code to execute")
language: str = Field(
default="python", description="The programming language of the code"
)
clear_context: bool = Field(
default=False, description="Whether to clear execution context"
)
thread_id: str = Field(
default="default", description="Thread ID for the code interpreter session"
)
class ExecuteCommandInput(BaseModel):
"""Input for ExecuteCommand."""
command: str = Field(description="The command to execute")
thread_id: str = Field(
default="default", description="Thread ID for the code interpreter session"
)
class ReadFilesInput(BaseModel):
"""Input for ReadFiles."""
paths: list[str] = Field(description="List of file paths to read")
thread_id: str = Field(
default="default", description="Thread ID for the code interpreter session"
)
class ListFilesInput(BaseModel):
"""Input for ListFiles."""
directory_path: str = Field(default="", description="Path to the directory to list")
thread_id: str = Field(
default="default", description="Thread ID for the code interpreter session"
)
class DeleteFilesInput(BaseModel):
"""Input for DeleteFiles."""
paths: list[str] = Field(description="List of file paths to delete")
thread_id: str = Field(
default="default", description="Thread ID for the code interpreter session"
)
class WriteFilesInput(BaseModel):
"""Input for WriteFiles."""
files: list[dict[str, str]] = Field(
description="List of dictionaries with path and text fields"
)
thread_id: str = Field(
default="default", description="Thread ID for the code interpreter session"
)
class StartCommandInput(BaseModel):
"""Input for StartCommand."""
command: str = Field(description="The command to execute asynchronously")
thread_id: str = Field(
default="default", description="Thread ID for the code interpreter session"
)
class GetTaskInput(BaseModel):
"""Input for GetTask."""
task_id: str = Field(description="The ID of the task to check")
thread_id: str = Field(
default="default", description="Thread ID for the code interpreter session"
)
class StopTaskInput(BaseModel):
"""Input for StopTask."""
task_id: str = Field(description="The ID of the task to stop")
thread_id: str = Field(
default="default", description="Thread ID for the code interpreter session"
)
# Tool classes
class ExecuteCodeTool(BaseTool):
"""Tool for executing code in various languages."""
name: str = "execute_code"
description: str = "Execute code in various languages (primarily Python)"
args_schema: type[BaseModel] = ExecuteCodeInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(
self,
code: str,
language: str = "python",
clear_context: bool = False,
thread_id: str = "default",
) -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(
thread_id=thread_id
)
# Execute code
response = code_interpreter.invoke(
method="executeCode",
params={
"code": code,
"language": language,
"clearContext": clear_context,
},
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error executing code: {e!s}"
async def _arun(
self,
code: str,
language: str = "python",
clear_context: bool = False,
thread_id: str = "default",
) -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(
code=code,
language=language,
clear_context=clear_context,
thread_id=thread_id,
)
class ExecuteCommandTool(BaseTool):
"""Tool for running shell commands in the code interpreter environment."""
name: str = "execute_command"
description: str = "Run shell commands in the code interpreter environment"
args_schema: type[BaseModel] = ExecuteCommandInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, command: str, thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(
thread_id=thread_id
)
# Execute command
response = code_interpreter.invoke(
method="executeCommand", params={"command": command}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error executing command: {e!s}"
async def _arun(self, command: str, thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(command=command, thread_id=thread_id)
class ReadFilesTool(BaseTool):
"""Tool for reading content of files in the environment."""
name: str = "read_files"
description: str = "Read content of files in the environment"
args_schema: type[BaseModel] = ReadFilesInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, paths: list[str], thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(
thread_id=thread_id
)
# Read files
response = code_interpreter.invoke(
method="readFiles", params={"paths": paths}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error reading files: {e!s}"
async def _arun(self, paths: list[str], thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(paths=paths, thread_id=thread_id)
class ListFilesTool(BaseTool):
"""Tool for listing files in directories in the environment."""
name: str = "list_files"
description: str = "List files in directories in the environment"
args_schema: type[BaseModel] = ListFilesInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, directory_path: str = "", thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(
thread_id=thread_id
)
# List files
response = code_interpreter.invoke(
method="listFiles", params={"directoryPath": directory_path}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error listing files: {e!s}"
async def _arun(self, directory_path: str = "", thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(directory_path=directory_path, thread_id=thread_id)
class DeleteFilesTool(BaseTool):
"""Tool for removing files from the environment."""
name: str = "delete_files"
description: str = "Remove files from the environment"
args_schema: type[BaseModel] = DeleteFilesInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, paths: list[str], thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(
thread_id=thread_id
)
# Remove files
response = code_interpreter.invoke(
method="removeFiles", params={"paths": paths}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error deleting files: {e!s}"
async def _arun(self, paths: list[str], thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(paths=paths, thread_id=thread_id)
class WriteFilesTool(BaseTool):
"""Tool for creating or updating files in the environment."""
name: str = "write_files"
description: str = "Create or update files in the environment"
args_schema: type[BaseModel] = WriteFilesInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, files: list[dict[str, str]], thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(
thread_id=thread_id
)
# Write files
response = code_interpreter.invoke(
method="writeFiles", params={"content": files}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error writing files: {e!s}"
async def _arun(
self, files: list[dict[str, str]], thread_id: str = "default"
) -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(files=files, thread_id=thread_id)
class StartCommandTool(BaseTool):
"""Tool for starting long-running commands asynchronously."""
name: str = "start_command_execution"
description: str = "Start long-running commands asynchronously"
args_schema: type[BaseModel] = StartCommandInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, command: str, thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(
thread_id=thread_id
)
# Start command execution
response = code_interpreter.invoke(
method="startCommandExecution", params={"command": command}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error starting command: {e!s}"
async def _arun(self, command: str, thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(command=command, thread_id=thread_id)
class GetTaskTool(BaseTool):
"""Tool for checking status of async tasks."""
name: str = "get_task"
description: str = "Check status of async tasks"
args_schema: type[BaseModel] = GetTaskInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, task_id: str, thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(
thread_id=thread_id
)
# Get task status
response = code_interpreter.invoke(
method="getTask", params={"taskId": task_id}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error getting task status: {e!s}"
async def _arun(self, task_id: str, thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(task_id=task_id, thread_id=thread_id)
class StopTaskTool(BaseTool):
"""Tool for stopping running tasks."""
name: str = "stop_task"
description: str = "Stop running tasks"
args_schema: type[BaseModel] = StopTaskInput
toolkit: Any = Field(default=None, exclude=True)
def __init__(self, toolkit):
super().__init__()
self.toolkit = toolkit
def _run(self, task_id: str, thread_id: str = "default") -> str:
try:
# Get or create code interpreter
code_interpreter = self.toolkit._get_or_create_interpreter(
thread_id=thread_id
)
# Stop task
response = code_interpreter.invoke(
method="stopTask", params={"taskId": task_id}
)
return extract_output_from_stream(response)
except Exception as e:
return f"Error stopping task: {e!s}"
async def _arun(self, task_id: str, thread_id: str = "default") -> str:
# Use _run as we're working with a synchronous API that's thread-safe
return self._run(task_id=task_id, thread_id=thread_id)
class CodeInterpreterToolkit:
"""Toolkit for working with AWS Bedrock code interpreter environment.
This toolkit provides a set of tools for working with a remote code interpreter environment:
* execute_code - Run code in various languages (primarily Python)
* execute_command - Run shell commands
* read_files - Read content of files in the environment
* list_files - List files in directories
* delete_files - Remove files from the environment
* write_files - Create or update files
* start_command_execution - Start long-running commands asynchronously
* get_task - Check status of async tasks
* stop_task - Stop running tasks
The toolkit lazily initializes the code interpreter session on first use.
It supports multiple threads by maintaining separate code interpreter sessions for each thread ID.
Example:
```python
from crewai import Agent, Task, Crew
from crewai_tools.aws.bedrock.code_interpreter import create_code_interpreter_toolkit
# Create the code interpreter toolkit
toolkit, code_tools = create_code_interpreter_toolkit(region="us-west-2")
# Create a CrewAI agent that uses the code interpreter tools
developer_agent = Agent(
role="Python Developer",
goal="Create and execute Python code to solve problems",
backstory="You're a skilled Python developer with expertise in data analysis.",
tools=code_tools
)
# Create a task for the agent
coding_task = Task(
description="Write a Python function that calculates the factorial of a number and test it.",
agent=developer_agent
)
# Create and run the crew
crew = Crew(
agents=[developer_agent],
tasks=[coding_task]
)
result = crew.kickoff()
# Clean up resources when done
import asyncio
asyncio.run(toolkit.cleanup())
```
"""
def __init__(self, region: str = "us-west-2"):
"""
Initialize the toolkit
Args:
region: AWS region for the code interpreter
"""
self.region = region
self._code_interpreters: dict[str, CodeInterpreter] = {}
self.tools: list[BaseTool] = []
self._setup_tools()
def _setup_tools(self) -> None:
"""Initialize tools without creating any code interpreter sessions."""
self.tools = [
ExecuteCodeTool(self),
ExecuteCommandTool(self),
ReadFilesTool(self),
ListFilesTool(self),
DeleteFilesTool(self),
WriteFilesTool(self),
StartCommandTool(self),
GetTaskTool(self),
StopTaskTool(self),
]
def _get_or_create_interpreter(self, thread_id: str = "default") -> CodeInterpreter:
"""Get or create a code interpreter for the specified thread.
Args:
thread_id: Thread ID for the code interpreter session
Returns:
CodeInterpreter instance
"""
if thread_id in self._code_interpreters:
return self._code_interpreters[thread_id]
# Create a new code interpreter for this thread
from bedrock_agentcore.tools.code_interpreter_client import CodeInterpreter
code_interpreter = CodeInterpreter(region=self.region)
code_interpreter.start()
logger.info(
f"Started code interpreter with session_id:{code_interpreter.session_id} for thread:{thread_id}"
)
# Store the interpreter
self._code_interpreters[thread_id] = code_interpreter
return code_interpreter
def get_tools(self) -> list[BaseTool]:
"""
Get the list of code interpreter tools
Returns:
List of CrewAI tools
"""
return self.tools
def get_tools_by_name(self) -> dict[str, BaseTool]:
"""
Get a dictionary of tools mapped by their names
Returns:
Dictionary of {tool_name: tool}
"""
return {tool.name: tool for tool in self.tools}
async def cleanup(self, thread_id: str | None = None) -> None:
"""Clean up resources
Args:
thread_id: Optional thread ID to clean up. If None, cleans up all sessions.
"""
if thread_id:
# Clean up a specific thread's session
if thread_id in self._code_interpreters:
try:
self._code_interpreters[thread_id].stop()
del self._code_interpreters[thread_id]
logger.info(
f"Code interpreter session for thread {thread_id} cleaned up"
)
except Exception as e:
logger.warning(
f"Error stopping code interpreter for thread {thread_id}: {e}"
)
else:
# Clean up all sessions
thread_ids = list(self._code_interpreters.keys())
for tid in thread_ids:
try:
self._code_interpreters[tid].stop()
except Exception as e:
logger.warning(
f"Error stopping code interpreter for thread {tid}: {e}"
)
self._code_interpreters = {}
logger.info("All code interpreter sessions cleaned up")
def create_code_interpreter_toolkit(
region: str = "us-west-2",
) -> tuple[CodeInterpreterToolkit, list[BaseTool]]:
"""
Create a CodeInterpreterToolkit
Args:
region: AWS region for code interpreter
Returns:
Tuple of (toolkit, tools)
"""
toolkit = CodeInterpreterToolkit(region=region)
tools = toolkit.get_tools()
return toolkit, tools

View File

@@ -1,17 +0,0 @@
"""Custom exceptions for AWS Bedrock integration."""
class BedrockError(Exception):
"""Base exception for Bedrock-related errors."""
class BedrockAgentError(BedrockError):
"""Exception raised for errors in the Bedrock Agent operations."""
class BedrockKnowledgeBaseError(BedrockError):
"""Exception raised for errors in the Bedrock Knowledge Base operations."""
class BedrockValidationError(BedrockError):
"""Exception raised for validation errors in Bedrock operations."""

View File

@@ -1,159 +0,0 @@
# BedrockKBRetrieverTool
The `BedrockKBRetrieverTool` enables CrewAI agents to retrieve information from Amazon Bedrock Knowledge Bases using natural language queries.
## Installation
```bash
pip install 'crewai[tools]'
```
## Requirements
- AWS credentials configured (either through environment variables or AWS CLI)
- `boto3` and `python-dotenv` packages
- Access to Amazon Bedrock Knowledge Base
## Usage
Here's how to use the tool with a CrewAI agent:
```python
from crewai import Agent, Task, Crew
from crewai_tools.aws.bedrock.knowledge_base.retriever_tool import BedrockKBRetrieverTool
# Initialize the tool
kb_tool = BedrockKBRetrieverTool(
knowledge_base_id="your-kb-id",
number_of_results=5
)
# Create a CrewAI agent that uses the tool
researcher = Agent(
role='Knowledge Base Researcher',
goal='Find information about company policies',
backstory='I am a researcher specialized in retrieving and analyzing company documentation.',
tools=[kb_tool],
verbose=True
)
# Create a task for the agent
research_task = Task(
description="Find our company's remote work policy and summarize the key points.",
agent=researcher
)
# Create a crew with the agent
crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=2
)
# Run the crew
result = crew.kickoff()
print(result)
```
## Tool Arguments
| Argument | Type | Required | Default | Description |
|----------|------|----------|---------|-------------|
| knowledge_base_id | str | Yes | None | The unique identifier of the knowledge base (0-10 alphanumeric characters) |
| number_of_results | int | No | 5 | Maximum number of results to return |
| retrieval_configuration | dict | No | None | Custom configurations for the knowledge base query |
| guardrail_configuration | dict | No | None | Content filtering settings |
| next_token | str | No | None | Token for pagination |
## Environment Variables
```bash
BEDROCK_KB_ID=your-knowledge-base-id # Alternative to passing knowledge_base_id
AWS_REGION=your-aws-region # Defaults to us-east-1
AWS_ACCESS_KEY_ID=your-access-key # Required for AWS authentication
AWS_SECRET_ACCESS_KEY=your-secret-key # Required for AWS authentication
```
## Response Format
The tool returns results in JSON format:
```json
{
"results": [
{
"content": "Retrieved text content",
"content_type": "text",
"source_type": "S3",
"source_uri": "s3://bucket/document.pdf",
"score": 0.95,
"metadata": {
"additional": "metadata"
}
}
],
"nextToken": "pagination-token",
"guardrailAction": "NONE"
}
```
## Advanced Usage
### Custom Retrieval Configuration
```python
kb_tool = BedrockKBRetrieverTool(
knowledge_base_id="your-kb-id",
retrieval_configuration={
"vectorSearchConfiguration": {
"numberOfResults": 10,
"overrideSearchType": "HYBRID"
}
}
)
policy_expert = Agent(
role='Policy Expert',
goal='Analyze company policies in detail',
backstory='I am an expert in corporate policy analysis with deep knowledge of regulatory requirements.',
tools=[kb_tool]
)
```
## Supported Data Sources
- Amazon S3
- Confluence
- Salesforce
- SharePoint
- Web pages
- Custom document locations
- Amazon Kendra
- SQL databases
## Use Cases
### Enterprise Knowledge Integration
- Enable CrewAI agents to access your organization's proprietary knowledge without exposing sensitive data
- Allow agents to make decisions based on your company's specific policies, procedures, and documentation
- Create agents that can answer questions based on your internal documentation while maintaining data security
### Specialized Domain Knowledge
- Connect CrewAI agents to domain-specific knowledge bases (legal, medical, technical) without retraining models
- Leverage existing knowledge repositories that are already maintained in your AWS environment
- Combine CrewAI's reasoning with domain-specific information from your knowledge bases
### Data-Driven Decision Making
- Ground CrewAI agent responses in your actual company data rather than general knowledge
- Ensure agents provide recommendations based on your specific business context and documentation
- Reduce hallucinations by retrieving factual information from your knowledge bases
### Scalable Information Access
- Access terabytes of organizational knowledge without embedding it all into your models
- Dynamically query only the relevant information needed for specific tasks
- Leverage AWS's scalable infrastructure to handle large knowledge bases efficiently
### Compliance and Governance
- Ensure CrewAI agents provide responses that align with your company's approved documentation
- Create auditable trails of information sources used by your agents
- Maintain control over what information sources your agents can access

View File

@@ -1,3 +0,0 @@
from .retriever_tool import BedrockKBRetrieverTool
__all__ = ["BedrockKBRetrieverTool"]

View File

@@ -1,261 +0,0 @@
import json
import os
from typing import Any
from crewai.tools import BaseTool
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from ..exceptions import BedrockKnowledgeBaseError, BedrockValidationError
# Load environment variables from .env file
load_dotenv()
class BedrockKBRetrieverToolInput(BaseModel):
"""Input schema for BedrockKBRetrieverTool."""
query: str = Field(
..., description="The query to retrieve information from the knowledge base"
)
class BedrockKBRetrieverTool(BaseTool):
name: str = "Bedrock Knowledge Base Retriever Tool"
description: str = (
"Retrieves information from an Amazon Bedrock Knowledge Base given a query"
)
args_schema: type[BaseModel] = BedrockKBRetrieverToolInput
knowledge_base_id: str = None
number_of_results: int | None = 5
retrieval_configuration: dict[str, Any] | None = None
guardrail_configuration: dict[str, Any] | None = None
next_token: str | None = None
package_dependencies: list[str] = ["boto3"]
def __init__(
self,
knowledge_base_id: str | None = None,
number_of_results: int | None = 5,
retrieval_configuration: dict[str, Any] | None = None,
guardrail_configuration: dict[str, Any] | None = None,
next_token: str | None = None,
**kwargs,
):
"""Initialize the BedrockKBRetrieverTool with knowledge base configuration.
Args:
knowledge_base_id (str): The unique identifier of the knowledge base to query
number_of_results (Optional[int], optional): The maximum number of results to return. Defaults to 5.
retrieval_configuration (Optional[Dict[str, Any]], optional): Configurations for the knowledge base query and retrieval process. Defaults to None.
guardrail_configuration (Optional[Dict[str, Any]], optional): Guardrail settings. Defaults to None.
next_token (Optional[str], optional): Token for retrieving the next batch of results. Defaults to None.
"""
super().__init__(**kwargs)
# Get knowledge_base_id from environment variable if not provided
self.knowledge_base_id = knowledge_base_id or os.getenv("BEDROCK_KB_ID")
self.number_of_results = number_of_results
self.guardrail_configuration = guardrail_configuration
self.next_token = next_token
# Initialize retrieval_configuration with provided parameters or use the one provided
if retrieval_configuration is None:
self.retrieval_configuration = self._build_retrieval_configuration()
else:
self.retrieval_configuration = retrieval_configuration
# Validate parameters
self._validate_parameters()
# Update the description to include the knowledge base details
self.description = f"Retrieves information from Amazon Bedrock Knowledge Base '{self.knowledge_base_id}' given a query"
def _build_retrieval_configuration(self) -> dict[str, Any]:
"""Build the retrieval configuration based on provided parameters.
Returns:
Dict[str, Any]: The constructed retrieval configuration
"""
vector_search_config = {}
# Add number of results if provided
if self.number_of_results is not None:
vector_search_config["numberOfResults"] = self.number_of_results
return {"vectorSearchConfiguration": vector_search_config}
def _validate_parameters(self):
"""Validate the parameters according to AWS API requirements."""
try:
# Validate knowledge_base_id
if not self.knowledge_base_id:
raise BedrockValidationError("knowledge_base_id cannot be empty")
if not isinstance(self.knowledge_base_id, str):
raise BedrockValidationError("knowledge_base_id must be a string")
if len(self.knowledge_base_id) > 10:
raise BedrockValidationError(
"knowledge_base_id must be 10 characters or less"
)
if not all(c.isalnum() for c in self.knowledge_base_id):
raise BedrockValidationError(
"knowledge_base_id must contain only alphanumeric characters"
)
# Validate next_token if provided
if self.next_token:
if not isinstance(self.next_token, str):
raise BedrockValidationError("next_token must be a string")
if len(self.next_token) < 1 or len(self.next_token) > 2048:
raise BedrockValidationError(
"next_token must be between 1 and 2048 characters"
)
if " " in self.next_token:
raise BedrockValidationError("next_token cannot contain spaces")
# Validate number_of_results if provided
if self.number_of_results is not None:
if not isinstance(self.number_of_results, int):
raise BedrockValidationError("number_of_results must be an integer")
if self.number_of_results < 1:
raise BedrockValidationError(
"number_of_results must be greater than 0"
)
except BedrockValidationError as e:
raise BedrockValidationError(f"Parameter validation failed: {e!s}")
def _process_retrieval_result(self, result: dict[str, Any]) -> dict[str, Any]:
"""Process a single retrieval result from Bedrock Knowledge Base.
Args:
result (Dict[str, Any]): Raw result from Bedrock Knowledge Base
Returns:
Dict[str, Any]: Processed result with standardized format
"""
# Extract content
content_obj = result.get("content", {})
content = content_obj.get("text", "")
content_type = content_obj.get("type", "text")
# Extract location information
location = result.get("location", {})
location_type = location.get("type", "unknown")
source_uri = None
# Map for location types and their URI fields
location_mapping = {
"s3Location": {"field": "uri", "type": "S3"},
"confluenceLocation": {"field": "url", "type": "Confluence"},
"salesforceLocation": {"field": "url", "type": "Salesforce"},
"sharePointLocation": {"field": "url", "type": "SharePoint"},
"webLocation": {"field": "url", "type": "Web"},
"customDocumentLocation": {"field": "id", "type": "CustomDocument"},
"kendraDocumentLocation": {"field": "uri", "type": "KendraDocument"},
"sqlLocation": {"field": "query", "type": "SQL"},
}
# Extract the URI based on location type
for loc_key, config in location_mapping.items():
if loc_key in location:
source_uri = location[loc_key].get(config["field"])
if not location_type or location_type == "unknown":
location_type = config["type"]
break
# Create result object
result_object = {
"content": content,
"content_type": content_type,
"source_type": location_type,
"source_uri": source_uri,
}
# Add optional fields if available
if "score" in result:
result_object["score"] = result["score"]
if "metadata" in result:
result_object["metadata"] = result["metadata"]
# Handle byte content if present
if "byteContent" in content_obj:
result_object["byte_content"] = content_obj["byteContent"]
# Handle row content if present
if "row" in content_obj:
result_object["row_content"] = content_obj["row"]
return result_object
def _run(self, query: str) -> str:
try:
import boto3
from botocore.exceptions import ClientError
except ImportError:
raise ImportError("`boto3` package not found, please run `uv add boto3`")
try:
# Initialize the Bedrock Agent Runtime client
bedrock_agent_runtime = boto3.client(
"bedrock-agent-runtime",
region_name=os.getenv(
"AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "us-east-1")
),
# AWS SDK will automatically use AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY from environment
)
# Prepare the request parameters
retrieve_params = {
"knowledgeBaseId": self.knowledge_base_id,
"retrievalQuery": {"text": query},
}
# Add optional parameters if provided
if self.retrieval_configuration:
retrieve_params["retrievalConfiguration"] = self.retrieval_configuration
if self.guardrail_configuration:
retrieve_params["guardrailConfiguration"] = self.guardrail_configuration
if self.next_token:
retrieve_params["nextToken"] = self.next_token
# Make the retrieve API call
response = bedrock_agent_runtime.retrieve(**retrieve_params)
# Process the response
results = []
for result in response.get("retrievalResults", []):
processed_result = self._process_retrieval_result(result)
results.append(processed_result)
# Build the response object
response_object = {}
if results:
response_object["results"] = results
else:
response_object["message"] = "No results found for the given query."
if "nextToken" in response:
response_object["nextToken"] = response["nextToken"]
if "guardrailAction" in response:
response_object["guardrailAction"] = response["guardrailAction"]
# Return the results as a JSON string
return json.dumps(response_object, indent=2)
except ClientError as e:
error_code = "Unknown"
error_message = str(e)
# Try to extract error code if available
if hasattr(e, "response") and "Error" in e.response:
error_code = e.response["Error"].get("Code", "Unknown")
error_message = e.response["Error"].get("Message", str(e))
raise BedrockKnowledgeBaseError(f"Error ({error_code}): {error_message}")
except Exception as e:
raise BedrockKnowledgeBaseError(f"Unexpected error: {e!s}")

View File

@@ -1,52 +0,0 @@
# AWS S3 Tools
## Description
These tools provide a way to interact with Amazon S3, a cloud storage service.
## Installation
Install the crewai_tools package
```shell
pip install 'crewai[tools]'
```
## AWS Connectivity
The tools use `boto3` to connect to AWS S3.
You can configure your environment to use AWS IAM roles, see [AWS IAM Roles documentation](https://docs.aws.amazon.com/sdk-for-python/v1/developer-guide/iam-roles.html#creating-an-iam-role)
Set the following environment variables:
- `CREW_AWS_REGION`
- `CREW_AWS_ACCESS_KEY_ID`
- `CREW_AWS_SEC_ACCESS_KEY`
## Usage
To use the AWS S3 tools in your CrewAI agents, import the necessary tools and include them in your agent's configuration:
```python
from crewai_tools.aws.s3 import S3ReaderTool, S3WriterTool
# For reading from S3
@agent
def file_retriever(self) -> Agent:
return Agent(
config=self.agents_config['file_retriever'],
verbose=True,
tools=[S3ReaderTool()]
)
# For writing to S3
@agent
def file_uploader(self) -> Agent:
return Agent(
config=self.agents_config['file_uploader'],
verbose=True,
tools=[S3WriterTool()]
)
```
These tools can be used to read from and write to S3 buckets within your CrewAI workflows. Make sure you have properly configured your AWS credentials as mentioned in the AWS Connectivity section above.

View File

@@ -1,2 +0,0 @@
from .reader_tool import S3ReaderTool
from .writer_tool import S3WriterTool

View File

@@ -1,47 +0,0 @@
import os
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class S3ReaderToolInput(BaseModel):
"""Input schema for S3ReaderTool."""
file_path: str = Field(
..., description="S3 file path (e.g., 's3://bucket-name/file-name')"
)
class S3ReaderTool(BaseTool):
name: str = "S3 Reader Tool"
description: str = "Reads a file from Amazon S3 given an S3 file path"
args_schema: type[BaseModel] = S3ReaderToolInput
package_dependencies: list[str] = ["boto3"]
def _run(self, file_path: str) -> str:
try:
import boto3
from botocore.exceptions import ClientError
except ImportError:
raise ImportError("`boto3` package not found, please run `uv add boto3`")
try:
bucket_name, object_key = self._parse_s3_path(file_path)
s3 = boto3.client(
"s3",
region_name=os.getenv("CREW_AWS_REGION", "us-east-1"),
aws_access_key_id=os.getenv("CREW_AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("CREW_AWS_SEC_ACCESS_KEY"),
)
# Read file content from S3
response = s3.get_object(Bucket=bucket_name, Key=object_key)
return response["Body"].read().decode("utf-8")
except ClientError as e:
return f"Error reading file from S3: {e!s}"
def _parse_s3_path(self, file_path: str) -> tuple:
parts = file_path.replace("s3://", "").split("/", 1)
return parts[0], parts[1]

View File

@@ -1,48 +0,0 @@
import os
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
class S3WriterToolInput(BaseModel):
"""Input schema for S3WriterTool."""
file_path: str = Field(
..., description="S3 file path (e.g., 's3://bucket-name/file-name')"
)
content: str = Field(..., description="Content to write to the file")
class S3WriterTool(BaseTool):
name: str = "S3 Writer Tool"
description: str = "Writes content to a file in Amazon S3 given an S3 file path"
args_schema: type[BaseModel] = S3WriterToolInput
package_dependencies: list[str] = ["boto3"]
def _run(self, file_path: str, content: str) -> str:
try:
import boto3
from botocore.exceptions import ClientError
except ImportError:
raise ImportError("`boto3` package not found, please run `uv add boto3`")
try:
bucket_name, object_key = self._parse_s3_path(file_path)
s3 = boto3.client(
"s3",
region_name=os.getenv("CREW_AWS_REGION", "us-east-1"),
aws_access_key_id=os.getenv("CREW_AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("CREW_AWS_SEC_ACCESS_KEY"),
)
s3.put_object(
Bucket=bucket_name, Key=object_key, Body=content.encode("utf-8")
)
return f"Successfully wrote content to {file_path}"
except ClientError as e:
return f"Error writing file to S3: {e!s}"
def _parse_s3_path(self, file_path: str) -> tuple:
parts = file_path.replace("s3://", "").split("/", 1)
return parts[0], parts[1]

View File

@@ -1,129 +0,0 @@
"""Utility for colored console output."""
class Printer:
"""Handles colored console output formatting."""
@staticmethod
def print(content: str, color: str | None = None) -> None:
"""Prints content with optional color formatting.
Args:
content: The string to be printed.
color: Optional color name to format the output. If provided,
must match one of the _print_* methods available in this class.
If not provided or if the color is not supported, prints without
formatting.
"""
if hasattr(Printer, f"_print_{color}"):
getattr(Printer, f"_print_{color}")(content)
else:
print(content)
@staticmethod
def _print_bold_purple(content: str) -> None:
"""Prints content in bold purple color.
Args:
content: The string to be printed in bold purple.
"""
print(f"\033[1m\033[95m {content}\033[00m")
@staticmethod
def _print_bold_green(content: str) -> None:
"""Prints content in bold green color.
Args:
content: The string to be printed in bold green.
"""
print(f"\033[1m\033[92m {content}\033[00m")
@staticmethod
def _print_purple(content: str) -> None:
"""Prints content in purple color.
Args:
content: The string to be printed in purple.
"""
print(f"\033[95m {content}\033[00m")
@staticmethod
def _print_red(content: str) -> None:
"""Prints content in red color.
Args:
content: The string to be printed in red.
"""
print(f"\033[91m {content}\033[00m")
@staticmethod
def _print_bold_blue(content: str) -> None:
"""Prints content in bold blue color.
Args:
content: The string to be printed in bold blue.
"""
print(f"\033[1m\033[94m {content}\033[00m")
@staticmethod
def _print_yellow(content: str) -> None:
"""Prints content in yellow color.
Args:
content: The string to be printed in yellow.
"""
print(f"\033[93m {content}\033[00m")
@staticmethod
def _print_bold_yellow(content: str) -> None:
"""Prints content in bold yellow color.
Args:
content: The string to be printed in bold yellow.
"""
print(f"\033[1m\033[93m {content}\033[00m")
@staticmethod
def _print_cyan(content: str) -> None:
"""Prints content in cyan color.
Args:
content: The string to be printed in cyan.
"""
print(f"\033[96m {content}\033[00m")
@staticmethod
def _print_bold_cyan(content: str) -> None:
"""Prints content in bold cyan color.
Args:
content: The string to be printed in bold cyan.
"""
print(f"\033[1m\033[96m {content}\033[00m")
@staticmethod
def _print_magenta(content: str) -> None:
"""Prints content in magenta color.
Args:
content: The string to be printed in magenta.
"""
print(f"\033[35m {content}\033[00m")
@staticmethod
def _print_bold_magenta(content: str) -> None:
"""Prints content in bold magenta color.
Args:
content: The string to be printed in bold magenta.
"""
print(f"\033[1m\033[35m {content}\033[00m")
@staticmethod
def _print_green(content: str) -> None:
"""Prints content in green color.
Args:
content: The string to be printed in green.
"""
print(f"\033[32m {content}\033[00m")

View File

@@ -1,8 +0,0 @@
from crewai_tools.rag.core import RAG, EmbeddingService
from crewai_tools.rag.data_types import DataType
__all__ = [
"RAG",
"DataType",
"EmbeddingService",
]

View File

@@ -1,41 +0,0 @@
from abc import ABC, abstractmethod
from typing import Any
from pydantic import BaseModel, Field
from crewai_tools.rag.misc import compute_sha256
from crewai_tools.rag.source_content import SourceContent
class LoaderResult(BaseModel):
content: str = Field(description="The text content of the source")
source: str = Field(description="The source of the content", default="unknown")
metadata: dict[str, Any] = Field(
description="The metadata of the source", default_factory=dict
)
doc_id: str = Field(description="The id of the document")
class BaseLoader(ABC):
def __init__(self, config: dict[str, Any] | None = None):
self.config = config or {}
@abstractmethod
def load(self, content: SourceContent, **kwargs) -> LoaderResult: ...
def generate_doc_id(
self, source_ref: str | None = None, content: str | None = None
) -> str:
"""
Generate a unique document id based on the source reference and content.
If the source reference is not provided, the content is used as the source reference.
If the content is not provided, the source reference is used as the content.
If both are provided, the source reference is used as the content.
Both are optional because the TEXT content type does not have a source reference. In this case, the content is used as the source reference.
"""
source_ref = source_ref or ""
content = content or ""
return compute_sha256(source_ref + content)

View File

@@ -1,19 +0,0 @@
from crewai_tools.rag.chunkers.base_chunker import BaseChunker
from crewai_tools.rag.chunkers.default_chunker import DefaultChunker
from crewai_tools.rag.chunkers.structured_chunker import (
CsvChunker,
JsonChunker,
XmlChunker,
)
from crewai_tools.rag.chunkers.text_chunker import DocxChunker, MdxChunker, TextChunker
__all__ = [
"BaseChunker",
"CsvChunker",
"DefaultChunker",
"DocxChunker",
"JsonChunker",
"MdxChunker",
"TextChunker",
"XmlChunker",
]

View File

@@ -1,180 +0,0 @@
import re
class RecursiveCharacterTextSplitter:
"""
A text splitter that recursively splits text based on a hierarchy of separators.
"""
def __init__(
self,
chunk_size: int = 4000,
chunk_overlap: int = 200,
separators: list[str] | None = None,
keep_separator: bool = True,
):
"""
Initialize the RecursiveCharacterTextSplitter.
Args:
chunk_size: Maximum size of each chunk
chunk_overlap: Number of characters to overlap between chunks
separators: List of separators to use for splitting (in order of preference)
keep_separator: Whether to keep the separator in the split text
"""
if chunk_overlap >= chunk_size:
raise ValueError(
f"Chunk overlap ({chunk_overlap}) cannot be >= chunk size ({chunk_size})"
)
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._keep_separator = keep_separator
self._separators = separators or [
"\n\n",
"\n",
" ",
"",
]
def split_text(self, text: str) -> list[str]:
return self._split_text(text, self._separators)
def _split_text(self, text: str, separators: list[str]) -> list[str]:
separator = separators[-1]
new_separators = []
for i, sep in enumerate(separators):
if sep == "":
separator = sep
break
if re.search(re.escape(sep), text):
separator = sep
new_separators = separators[i + 1 :]
break
splits = self._split_text_with_separator(text, separator)
good_splits = []
for split in splits:
if len(split) < self._chunk_size:
good_splits.append(split)
else:
if new_separators:
other_info = self._split_text(split, new_separators)
good_splits.extend(other_info)
else:
good_splits.extend(self._split_by_characters(split))
return self._merge_splits(good_splits, separator)
def _split_text_with_separator(self, text: str, separator: str) -> list[str]:
if separator == "":
return list(text)
if self._keep_separator and separator in text:
parts = text.split(separator)
splits = []
for i, part in enumerate(parts):
if i == 0:
splits.append(part)
elif i == len(parts) - 1:
if part:
splits.append(separator + part)
else:
if part:
splits.append(separator + part)
else:
if splits:
splits[-1] += separator
return [s for s in splits if s]
return text.split(separator)
def _split_by_characters(self, text: str) -> list[str]:
chunks = []
for i in range(0, len(text), self._chunk_size):
chunks.append(text[i : i + self._chunk_size])
return chunks
def _merge_splits(self, splits: list[str], separator: str) -> list[str]:
"""Merge splits into chunks with proper overlap."""
docs = []
current_doc = []
total = 0
for split in splits:
split_len = len(split)
if total + split_len > self._chunk_size and current_doc:
if separator == "":
doc = "".join(current_doc)
else:
if self._keep_separator and separator == " ":
doc = "".join(current_doc)
else:
doc = separator.join(current_doc)
if doc:
docs.append(doc)
# Handle overlap by keeping some of the previous content
while total > self._chunk_overlap and len(current_doc) > 1:
removed = current_doc.pop(0)
total -= len(removed)
if separator != "":
total -= len(separator)
current_doc.append(split)
total += split_len
if separator != "" and len(current_doc) > 1:
total += len(separator)
if current_doc:
if separator == "":
doc = "".join(current_doc)
else:
if self._keep_separator and separator == " ":
doc = "".join(current_doc)
else:
doc = separator.join(current_doc)
if doc:
docs.append(doc)
return docs
class BaseChunker:
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: list[str] | None = None,
keep_separator: bool = True,
):
"""
Initialize the Chunker
Args:
chunk_size: Maximum size of each chunk
chunk_overlap: Number of characters to overlap between chunks
separators: List of separators to use for splitting
keep_separator: Whether to keep separators in the chunks
"""
self._splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=separators,
keep_separator=keep_separator,
)
def chunk(self, text: str) -> list[str]:
if not text or not text.strip():
return []
return self._splitter.split_text(text)

View File

@@ -1,12 +0,0 @@
from crewai_tools.rag.chunkers.base_chunker import BaseChunker
class DefaultChunker(BaseChunker):
def __init__(
self,
chunk_size: int = 2000,
chunk_overlap: int = 20,
separators: list[str] | None = None,
keep_separator: bool = True,
):
super().__init__(chunk_size, chunk_overlap, separators, keep_separator)

View File

@@ -1,66 +0,0 @@
from crewai_tools.rag.chunkers.base_chunker import BaseChunker
class CsvChunker(BaseChunker):
def __init__(
self,
chunk_size: int = 1200,
chunk_overlap: int = 100,
separators: list[str] | None = None,
keep_separator: bool = True,
):
if separators is None:
separators = [
"\nRow ", # Row boundaries (from CSVLoader format)
"\n", # Line breaks
" | ", # Column separators
", ", # Comma separators
" ", # Word breaks
"", # Character level
]
super().__init__(chunk_size, chunk_overlap, separators, keep_separator)
class JsonChunker(BaseChunker):
def __init__(
self,
chunk_size: int = 2000,
chunk_overlap: int = 200,
separators: list[str] | None = None,
keep_separator: bool = True,
):
if separators is None:
separators = [
"\n\n", # Object/array boundaries
"\n", # Line breaks
"},", # Object endings
"],", # Array endings
", ", # Property separators
": ", # Key-value separators
" ", # Word breaks
"", # Character level
]
super().__init__(chunk_size, chunk_overlap, separators, keep_separator)
class XmlChunker(BaseChunker):
def __init__(
self,
chunk_size: int = 2500,
chunk_overlap: int = 250,
separators: list[str] | None = None,
keep_separator: bool = True,
):
if separators is None:
separators = [
"\n\n", # Element boundaries
"\n", # Line breaks
">", # Tag endings
". ", # Sentence endings (for text content)
"! ", # Exclamation endings
"? ", # Question endings
", ", # Comma separators
" ", # Word breaks
"", # Character level
]
super().__init__(chunk_size, chunk_overlap, separators, keep_separator)

View File

@@ -1,76 +0,0 @@
from crewai_tools.rag.chunkers.base_chunker import BaseChunker
class TextChunker(BaseChunker):
def __init__(
self,
chunk_size: int = 1500,
chunk_overlap: int = 150,
separators: list[str] | None = None,
keep_separator: bool = True,
):
if separators is None:
separators = [
"\n\n\n", # Multiple line breaks (sections)
"\n\n", # Paragraph breaks
"\n", # Line breaks
". ", # Sentence endings
"! ", # Exclamation endings
"? ", # Question endings
"; ", # Semicolon breaks
", ", # Comma breaks
" ", # Word breaks
"", # Character level
]
super().__init__(chunk_size, chunk_overlap, separators, keep_separator)
class DocxChunker(BaseChunker):
def __init__(
self,
chunk_size: int = 2500,
chunk_overlap: int = 250,
separators: list[str] | None = None,
keep_separator: bool = True,
):
if separators is None:
separators = [
"\n\n\n", # Multiple line breaks (major sections)
"\n\n", # Paragraph breaks
"\n", # Line breaks
". ", # Sentence endings
"! ", # Exclamation endings
"? ", # Question endings
"; ", # Semicolon breaks
", ", # Comma breaks
" ", # Word breaks
"", # Character level
]
super().__init__(chunk_size, chunk_overlap, separators, keep_separator)
class MdxChunker(BaseChunker):
def __init__(
self,
chunk_size: int = 3000,
chunk_overlap: int = 300,
separators: list[str] | None = None,
keep_separator: bool = True,
):
if separators is None:
separators = [
"\n## ", # H2 headers (major sections)
"\n### ", # H3 headers (subsections)
"\n#### ", # H4 headers (sub-subsections)
"\n\n", # Paragraph breaks
"\n```", # Code block boundaries
"\n", # Line breaks
". ", # Sentence endings
"! ", # Exclamation endings
"? ", # Question endings
"; ", # Semicolon breaks
", ", # Comma breaks
" ", # Word breaks
"", # Character level
]
super().__init__(chunk_size, chunk_overlap, separators, keep_separator)

View File

@@ -1,25 +0,0 @@
from crewai_tools.rag.chunkers.base_chunker import BaseChunker
class WebsiteChunker(BaseChunker):
def __init__(
self,
chunk_size: int = 2500,
chunk_overlap: int = 250,
separators: list[str] | None = None,
keep_separator: bool = True,
):
if separators is None:
separators = [
"\n\n\n", # Major section breaks
"\n\n", # Paragraph breaks
"\n", # Line breaks
". ", # Sentence endings
"! ", # Exclamation endings
"? ", # Question endings
"; ", # Semicolon breaks
", ", # Comma breaks
" ", # Word breaks
"", # Character level
]
super().__init__(chunk_size, chunk_overlap, separators, keep_separator)

View File

@@ -1,251 +0,0 @@
import logging
from pathlib import Path
from typing import Any
from uuid import uuid4
import chromadb
import litellm
from pydantic import BaseModel, Field, PrivateAttr
from crewai_tools.rag.base_loader import BaseLoader
from crewai_tools.rag.chunkers.base_chunker import BaseChunker
from crewai_tools.rag.data_types import DataType
from crewai_tools.rag.misc import compute_sha256
from crewai_tools.rag.source_content import SourceContent
from crewai_tools.tools.rag.rag_tool import Adapter
logger = logging.getLogger(__name__)
class EmbeddingService:
def __init__(self, model: str = "text-embedding-3-small", **kwargs):
self.model = model
self.kwargs = kwargs
def embed_text(self, text: str) -> list[float]:
try:
response = litellm.embedding(model=self.model, input=[text], **self.kwargs)
return response.data[0]["embedding"]
except Exception as e:
logger.error(f"Error generating embedding: {e}")
raise
def embed_batch(self, texts: list[str]) -> list[list[float]]:
if not texts:
return []
try:
response = litellm.embedding(model=self.model, input=texts, **self.kwargs)
return [data["embedding"] for data in response.data]
except Exception as e:
logger.error(f"Error generating batch embeddings: {e}")
raise
class Document(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
content: str
metadata: dict[str, Any] = Field(default_factory=dict)
data_type: DataType = DataType.TEXT
source: str | None = None
class RAG(Adapter):
collection_name: str = "crewai_knowledge_base"
persist_directory: str | None = None
embedding_model: str = "text-embedding-3-large"
summarize: bool = False
top_k: int = 5
embedding_config: dict[str, Any] = Field(default_factory=dict)
_client: Any = PrivateAttr()
_collection: Any = PrivateAttr()
_embedding_service: EmbeddingService = PrivateAttr()
def model_post_init(self, __context: Any) -> None:
try:
if self.persist_directory:
self._client = chromadb.PersistentClient(path=self.persist_directory)
else:
self._client = chromadb.Client()
self._collection = self._client.get_or_create_collection(
name=self.collection_name,
metadata={
"hnsw:space": "cosine",
"description": "CrewAI Knowledge Base",
},
)
self._embedding_service = EmbeddingService(
model=self.embedding_model, **self.embedding_config
)
except Exception as e:
logger.error(f"Failed to initialize ChromaDB: {e}")
raise
super().model_post_init(__context)
def add(
self,
content: str | Path,
data_type: str | DataType | None = None,
metadata: dict[str, Any] | None = None,
loader: BaseLoader | None = None,
chunker: BaseChunker | None = None,
**kwargs: Any,
) -> None:
source_content = SourceContent(content)
data_type = self._get_data_type(data_type=data_type, content=source_content)
if not loader:
loader = data_type.get_loader()
if not chunker:
chunker = data_type.get_chunker()
loader_result = loader.load(source_content)
doc_id = loader_result.doc_id
existing_doc = self._collection.get(
where={"source": source_content.source_ref}, limit=1
)
existing_doc_id = (
existing_doc and existing_doc["metadatas"][0]["doc_id"]
if existing_doc["metadatas"]
else None
)
if existing_doc_id == doc_id:
logger.warning(
f"Document with source {loader_result.source} already exists"
)
return
# Document with same source ref does exists but the content has changed, deleting the oldest reference
if existing_doc_id and existing_doc_id != loader_result.doc_id:
logger.warning(f"Deleting old document with doc_id {existing_doc_id}")
self._collection.delete(where={"doc_id": existing_doc_id})
documents = []
chunks = chunker.chunk(loader_result.content)
for i, chunk in enumerate(chunks):
doc_metadata = (metadata or {}).copy()
doc_metadata["chunk_index"] = i
documents.append(
Document(
id=compute_sha256(chunk),
content=chunk,
metadata=doc_metadata,
data_type=data_type,
source=loader_result.source,
)
)
if not documents:
logger.warning("No documents to add")
return
contents = [doc.content for doc in documents]
try:
embeddings = self._embedding_service.embed_batch(contents)
except Exception as e:
logger.error(f"Failed to generate embeddings: {e}")
return
ids = [doc.id for doc in documents]
metadatas = []
for doc in documents:
doc_metadata = doc.metadata.copy()
doc_metadata.update(
{
"data_type": doc.data_type.value,
"source": doc.source,
"doc_id": doc_id,
}
)
metadatas.append(doc_metadata)
try:
self._collection.add(
ids=ids,
embeddings=embeddings,
documents=contents,
metadatas=metadatas,
)
logger.info(f"Added {len(documents)} documents to knowledge base")
except Exception as e:
logger.error(f"Failed to add documents to ChromaDB: {e}")
def query(self, question: str, where: dict[str, Any] | None = None) -> str:
try:
question_embedding = self._embedding_service.embed_text(question)
results = self._collection.query(
query_embeddings=[question_embedding],
n_results=self.top_k,
where=where,
include=["documents", "metadatas", "distances"],
)
if (
not results
or not results.get("documents")
or not results["documents"][0]
):
return "No relevant content found."
documents = results["documents"][0]
metadatas = results.get("metadatas", [None])[0] or []
distances = results.get("distances", [None])[0] or []
# Return sources with relevance scores
formatted_results = []
for i, doc in enumerate(documents):
metadata = metadatas[i] if i < len(metadatas) else {}
distance = distances[i] if i < len(distances) else 1.0
source = metadata.get("source", "unknown") if metadata else "unknown"
score = (
1 - distance if distance is not None else 0
) # Convert distance to similarity
formatted_results.append(
f"[Source: {source}, Relevance: {score:.3f}]\n{doc}"
)
return "\n\n".join(formatted_results)
except Exception as e:
logger.error(f"Query failed: {e}")
return f"Error querying knowledge base: {e}"
def delete_collection(self) -> None:
try:
self._client.delete_collection(self.collection_name)
logger.info(f"Deleted collection: {self.collection_name}")
except Exception as e:
logger.error(f"Failed to delete collection: {e}")
def get_collection_info(self) -> dict[str, Any]:
try:
count = self._collection.count()
return {
"name": self.collection_name,
"count": count,
"embedding_model": self.embedding_model,
}
except Exception as e:
logger.error(f"Failed to get collection info: {e}")
return {"error": str(e)}
def _get_data_type(
self, content: SourceContent, data_type: str | DataType | None = None
) -> DataType:
try:
if isinstance(data_type, str):
return DataType(data_type)
except Exception:
pass
return content.data_type

View File

@@ -1,161 +0,0 @@
import os
from enum import Enum
from pathlib import Path
from urllib.parse import urlparse
from crewai_tools.rag.base_loader import BaseLoader
from crewai_tools.rag.chunkers.base_chunker import BaseChunker
class DataType(str, Enum):
PDF_FILE = "pdf_file"
TEXT_FILE = "text_file"
CSV = "csv"
JSON = "json"
XML = "xml"
DOCX = "docx"
MDX = "mdx"
# Database types
MYSQL = "mysql"
POSTGRES = "postgres"
# Repository types
GITHUB = "github"
DIRECTORY = "directory"
# Web types
WEBSITE = "website"
DOCS_SITE = "docs_site"
YOUTUBE_VIDEO = "youtube_video"
YOUTUBE_CHANNEL = "youtube_channel"
# Raw types
TEXT = "text"
def get_chunker(self) -> BaseChunker:
from importlib import import_module
chunkers = {
DataType.PDF_FILE: ("text_chunker", "TextChunker"),
DataType.TEXT_FILE: ("text_chunker", "TextChunker"),
DataType.TEXT: ("text_chunker", "TextChunker"),
DataType.DOCX: ("text_chunker", "DocxChunker"),
DataType.MDX: ("text_chunker", "MdxChunker"),
# Structured formats
DataType.CSV: ("structured_chunker", "CsvChunker"),
DataType.JSON: ("structured_chunker", "JsonChunker"),
DataType.XML: ("structured_chunker", "XmlChunker"),
DataType.WEBSITE: ("web_chunker", "WebsiteChunker"),
DataType.DIRECTORY: ("text_chunker", "TextChunker"),
DataType.YOUTUBE_VIDEO: ("text_chunker", "TextChunker"),
DataType.YOUTUBE_CHANNEL: ("text_chunker", "TextChunker"),
DataType.GITHUB: ("text_chunker", "TextChunker"),
DataType.DOCS_SITE: ("text_chunker", "TextChunker"),
DataType.MYSQL: ("text_chunker", "TextChunker"),
DataType.POSTGRES: ("text_chunker", "TextChunker"),
}
if self not in chunkers:
raise ValueError(f"No chunker defined for {self}")
module_name, class_name = chunkers[self]
module_path = f"crewai_tools.rag.chunkers.{module_name}"
try:
module = import_module(module_path)
return getattr(module, class_name)()
except Exception as e:
raise ValueError(f"Error loading chunker for {self}: {e}")
def get_loader(self) -> BaseLoader:
from importlib import import_module
loaders = {
DataType.PDF_FILE: ("pdf_loader", "PDFLoader"),
DataType.TEXT_FILE: ("text_loader", "TextFileLoader"),
DataType.TEXT: ("text_loader", "TextLoader"),
DataType.XML: ("xml_loader", "XMLLoader"),
DataType.WEBSITE: ("webpage_loader", "WebPageLoader"),
DataType.MDX: ("mdx_loader", "MDXLoader"),
DataType.JSON: ("json_loader", "JSONLoader"),
DataType.DOCX: ("docx_loader", "DOCXLoader"),
DataType.CSV: ("csv_loader", "CSVLoader"),
DataType.DIRECTORY: ("directory_loader", "DirectoryLoader"),
DataType.YOUTUBE_VIDEO: ("youtube_video_loader", "YoutubeVideoLoader"),
DataType.YOUTUBE_CHANNEL: (
"youtube_channel_loader",
"YoutubeChannelLoader",
),
DataType.GITHUB: ("github_loader", "GithubLoader"),
DataType.DOCS_SITE: ("docs_site_loader", "DocsSiteLoader"),
DataType.MYSQL: ("mysql_loader", "MySQLLoader"),
DataType.POSTGRES: ("postgres_loader", "PostgresLoader"),
}
if self not in loaders:
raise ValueError(f"No loader defined for {self}")
module_name, class_name = loaders[self]
module_path = f"crewai_tools.rag.loaders.{module_name}"
try:
module = import_module(module_path)
return getattr(module, class_name)()
except Exception as e:
raise ValueError(f"Error loading loader for {self}: {e}")
class DataTypes:
@staticmethod
def from_content(content: str | Path | None = None) -> DataType:
if content is None:
return DataType.TEXT
if isinstance(content, Path):
content = str(content)
is_url = False
if isinstance(content, str):
try:
url = urlparse(content)
is_url = (url.scheme and url.netloc) or url.scheme == "file"
except Exception:
pass
def get_file_type(path: str) -> DataType | None:
mapping = {
".pdf": DataType.PDF_FILE,
".csv": DataType.CSV,
".mdx": DataType.MDX,
".md": DataType.MDX,
".docx": DataType.DOCX,
".json": DataType.JSON,
".xml": DataType.XML,
".txt": DataType.TEXT_FILE,
}
for ext, dtype in mapping.items():
if path.endswith(ext):
return dtype
return None
if is_url:
dtype = get_file_type(url.path)
if dtype:
return dtype
if "docs" in url.netloc or ("docs" in url.path and url.scheme != "file"):
return DataType.DOCS_SITE
if "github.com" in url.netloc:
return DataType.GITHUB
return DataType.WEBSITE
if os.path.isfile(content):
dtype = get_file_type(content)
if dtype:
return dtype
if os.path.exists(content):
return DataType.TEXT_FILE
elif os.path.isdir(content):
return DataType.DIRECTORY
return DataType.TEXT

View File

@@ -1,26 +0,0 @@
from crewai_tools.rag.loaders.csv_loader import CSVLoader
from crewai_tools.rag.loaders.directory_loader import DirectoryLoader
from crewai_tools.rag.loaders.docx_loader import DOCXLoader
from crewai_tools.rag.loaders.json_loader import JSONLoader
from crewai_tools.rag.loaders.mdx_loader import MDXLoader
from crewai_tools.rag.loaders.pdf_loader import PDFLoader
from crewai_tools.rag.loaders.text_loader import TextFileLoader, TextLoader
from crewai_tools.rag.loaders.webpage_loader import WebPageLoader
from crewai_tools.rag.loaders.xml_loader import XMLLoader
from crewai_tools.rag.loaders.youtube_channel_loader import YoutubeChannelLoader
from crewai_tools.rag.loaders.youtube_video_loader import YoutubeVideoLoader
__all__ = [
"CSVLoader",
"DOCXLoader",
"DirectoryLoader",
"JSONLoader",
"MDXLoader",
"PDFLoader",
"TextFileLoader",
"TextLoader",
"WebPageLoader",
"XMLLoader",
"YoutubeChannelLoader",
"YoutubeVideoLoader",
]

View File

@@ -1,74 +0,0 @@
import csv
from io import StringIO
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class CSVLoader(BaseLoader):
def load(self, source_content: SourceContent, **kwargs) -> LoaderResult:
source_ref = source_content.source_ref
content_str = source_content.source
if source_content.is_url():
content_str = self._load_from_url(content_str, kwargs)
elif source_content.path_exists():
content_str = self._load_from_file(content_str)
return self._parse_csv(content_str, source_ref)
def _load_from_url(self, url: str, kwargs: dict) -> str:
import requests
headers = kwargs.get(
"headers",
{
"Accept": "text/csv, application/csv, text/plain",
"User-Agent": "Mozilla/5.0 (compatible; crewai-tools CSVLoader)",
},
)
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
return response.text
except Exception as e:
raise ValueError(f"Error fetching CSV from URL {url}: {e!s}")
def _load_from_file(self, path: str) -> str:
with open(path, "r", encoding="utf-8") as file:
return file.read()
def _parse_csv(self, content: str, source_ref: str) -> LoaderResult:
try:
csv_reader = csv.DictReader(StringIO(content))
text_parts = []
headers = csv_reader.fieldnames
if headers:
text_parts.append("Headers: " + " | ".join(headers))
text_parts.append("-" * 50)
for row_num, row in enumerate(csv_reader, 1):
row_text = " | ".join([f"{k}: {v}" for k, v in row.items() if v])
text_parts.append(f"Row {row_num}: {row_text}")
text = "\n".join(text_parts)
metadata = {
"format": "csv",
"columns": headers,
"rows": len(text_parts) - 2 if headers else 0,
}
except Exception as e:
text = content
metadata = {"format": "csv", "parse_error": str(e)}
return LoaderResult(
content=text,
source=source_ref,
metadata=metadata,
doc_id=self.generate_doc_id(source_ref=source_ref, content=text),
)

View File

@@ -1,166 +0,0 @@
import os
from pathlib import Path
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class DirectoryLoader(BaseLoader):
def load(self, source_content: SourceContent, **kwargs) -> LoaderResult:
"""
Load and process all files from a directory recursively.
Args:
source: Directory path or URL to a directory listing
**kwargs: Additional options:
- recursive: bool (default True) - Whether to search recursively
- include_extensions: list - Only include files with these extensions
- exclude_extensions: list - Exclude files with these extensions
- max_files: int - Maximum number of files to process
"""
source_ref = source_content.source_ref
if source_content.is_url():
raise ValueError(
"URL directory loading is not supported. Please provide a local directory path."
)
if not os.path.exists(source_ref):
raise FileNotFoundError(f"Directory does not exist: {source_ref}")
if not os.path.isdir(source_ref):
raise ValueError(f"Path is not a directory: {source_ref}")
return self._process_directory(source_ref, kwargs)
def _process_directory(self, dir_path: str, kwargs: dict) -> LoaderResult:
recursive = kwargs.get("recursive", True)
include_extensions = kwargs.get("include_extensions", None)
exclude_extensions = kwargs.get("exclude_extensions", None)
max_files = kwargs.get("max_files", None)
files = self._find_files(
dir_path, recursive, include_extensions, exclude_extensions
)
if max_files and len(files) > max_files:
files = files[:max_files]
all_contents = []
processed_files = []
errors = []
for file_path in files:
try:
result = self._process_single_file(file_path)
if result:
all_contents.append(f"=== File: {file_path} ===\n{result.content}")
processed_files.append(
{
"path": file_path,
"metadata": result.metadata,
"source": result.source,
}
)
except Exception as e:
error_msg = f"Error processing {file_path}: {e!s}"
errors.append(error_msg)
all_contents.append(f"=== File: {file_path} (ERROR) ===\n{error_msg}")
combined_content = "\n\n".join(all_contents)
metadata = {
"format": "directory",
"directory_path": dir_path,
"total_files": len(files),
"processed_files": len(processed_files),
"errors": len(errors),
"file_details": processed_files,
"error_details": errors,
}
return LoaderResult(
content=combined_content,
source=dir_path,
metadata=metadata,
doc_id=self.generate_doc_id(source_ref=dir_path, content=combined_content),
)
def _find_files(
self,
dir_path: str,
recursive: bool,
include_ext: list[str] | None = None,
exclude_ext: list[str] | None = None,
) -> list[str]:
"""Find all files in directory matching criteria."""
files = []
if recursive:
for root, dirs, filenames in os.walk(dir_path):
dirs[:] = [d for d in dirs if not d.startswith(".")]
for filename in filenames:
if self._should_include_file(filename, include_ext, exclude_ext):
files.append(os.path.join(root, filename))
else:
try:
for item in os.listdir(dir_path):
item_path = os.path.join(dir_path, item)
if os.path.isfile(item_path) and self._should_include_file(
item, include_ext, exclude_ext
):
files.append(item_path)
except PermissionError:
pass
return sorted(files)
def _should_include_file(
self,
filename: str,
include_ext: list[str] | None = None,
exclude_ext: list[str] | None = None,
) -> bool:
"""Determine if a file should be included based on criteria."""
if filename.startswith("."):
return False
_, ext = os.path.splitext(filename.lower())
if include_ext:
if ext not in [
e.lower() if e.startswith(".") else f".{e.lower()}" for e in include_ext
]:
return False
if exclude_ext:
if ext in [
e.lower() if e.startswith(".") else f".{e.lower()}" for e in exclude_ext
]:
return False
return True
def _process_single_file(self, file_path: str) -> LoaderResult:
from crewai_tools.rag.data_types import DataTypes
data_type = DataTypes.from_content(Path(file_path))
loader = data_type.get_loader()
result = loader.load(SourceContent(file_path))
if result.metadata is None:
result.metadata = {}
result.metadata.update(
{
"file_path": file_path,
"file_size": os.path.getsize(file_path),
"data_type": str(data_type),
"loader_type": loader.__class__.__name__,
}
)
return result

View File

@@ -1,106 +0,0 @@
"""Documentation site loader."""
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class DocsSiteLoader(BaseLoader):
"""Loader for documentation websites."""
def load(self, source: SourceContent, **kwargs) -> LoaderResult:
"""Load content from a documentation site.
Args:
source: Documentation site URL
**kwargs: Additional arguments
Returns:
LoaderResult with documentation content
"""
docs_url = source.source
try:
response = requests.get(docs_url, timeout=30)
response.raise_for_status()
except requests.RequestException as e:
raise ValueError(f"Unable to fetch documentation from {docs_url}: {e}")
soup = BeautifulSoup(response.text, "html.parser")
for script in soup(["script", "style"]):
script.decompose()
title = soup.find("title")
title_text = title.get_text(strip=True) if title else "Documentation"
main_content = None
for selector in [
"main",
"article",
'[role="main"]',
".content",
"#content",
".documentation",
]:
main_content = soup.select_one(selector)
if main_content:
break
if not main_content:
main_content = soup.find("body")
if not main_content:
raise ValueError(
f"Unable to extract content from documentation site: {docs_url}"
)
text_parts = [f"Title: {title_text}", ""]
headings = main_content.find_all(["h1", "h2", "h3"])
if headings:
text_parts.append("Table of Contents:")
for heading in headings[:15]:
level = int(heading.name[1])
indent = " " * (level - 1)
text_parts.append(f"{indent}- {heading.get_text(strip=True)}")
text_parts.append("")
text = main_content.get_text(separator="\n", strip=True)
lines = [line.strip() for line in text.split("\n") if line.strip()]
text_parts.extend(lines)
nav_links = []
for nav_selector in ["nav", ".sidebar", ".toc", ".navigation"]:
nav = soup.select_one(nav_selector)
if nav:
links = nav.find_all("a", href=True)
for link in links[:20]:
href = link["href"]
if not href.startswith(("http://", "https://", "mailto:", "#")):
full_url = urljoin(docs_url, href)
nav_links.append(f"- {link.get_text(strip=True)}: {full_url}")
if nav_links:
text_parts.append("")
text_parts.append("Related documentation pages:")
text_parts.extend(nav_links[:10])
content = "\n".join(text_parts)
if len(content) > 100000:
content = content[:100000] + "\n\n[Content truncated...]"
return LoaderResult(
content=content,
metadata={
"source": docs_url,
"title": title_text,
"domain": urlparse(docs_url).netloc,
},
doc_id=self.generate_doc_id(source_ref=docs_url, content=content),
)

View File

@@ -1,81 +0,0 @@
import os
import tempfile
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class DOCXLoader(BaseLoader):
def load(self, source_content: SourceContent, **kwargs) -> LoaderResult:
try:
from docx import Document as DocxDocument
except ImportError:
raise ImportError(
"python-docx is required for DOCX loading. Install with: 'uv pip install python-docx' or pip install crewai-tools[rag]"
)
source_ref = source_content.source_ref
if source_content.is_url():
temp_file = self._download_from_url(source_ref, kwargs)
try:
return self._load_from_file(temp_file, source_ref, DocxDocument)
finally:
os.unlink(temp_file)
elif source_content.path_exists():
return self._load_from_file(source_ref, source_ref, DocxDocument)
else:
raise ValueError(
f"Source must be a valid file path or URL, got: {source_content.source}"
)
def _download_from_url(self, url: str, kwargs: dict) -> str:
import requests
headers = kwargs.get(
"headers",
{
"Accept": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"User-Agent": "Mozilla/5.0 (compatible; crewai-tools DOCXLoader)",
},
)
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
# Create temporary file to save the DOCX content
with tempfile.NamedTemporaryFile(suffix=".docx", delete=False) as temp_file:
temp_file.write(response.content)
return temp_file.name
except Exception as e:
raise ValueError(f"Error fetching DOCX from URL {url}: {e!s}")
def _load_from_file(
self, file_path: str, source_ref: str, DocxDocument
) -> LoaderResult:
try:
doc = DocxDocument(file_path)
text_parts = []
for paragraph in doc.paragraphs:
if paragraph.text.strip():
text_parts.append(paragraph.text)
content = "\n".join(text_parts)
metadata = {
"format": "docx",
"paragraphs": len(doc.paragraphs),
"tables": len(doc.tables),
}
return LoaderResult(
content=content,
source=source_ref,
metadata=metadata,
doc_id=self.generate_doc_id(source_ref=source_ref, content=content),
)
except Exception as e:
raise ValueError(f"Error loading DOCX file: {e!s}")

View File

@@ -1,112 +0,0 @@
"""GitHub repository content loader."""
from github import Github, GithubException
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class GithubLoader(BaseLoader):
"""Loader for GitHub repository content."""
def load(self, source: SourceContent, **kwargs) -> LoaderResult:
"""Load content from a GitHub repository.
Args:
source: GitHub repository URL
**kwargs: Additional arguments including gh_token and content_types
Returns:
LoaderResult with repository content
"""
metadata = kwargs.get("metadata", {})
gh_token = metadata.get("gh_token")
content_types = metadata.get("content_types", ["code", "repo"])
repo_url = source.source
if not repo_url.startswith("https://github.com/"):
raise ValueError(f"Invalid GitHub URL: {repo_url}")
parts = repo_url.replace("https://github.com/", "").strip("/").split("/")
if len(parts) < 2:
raise ValueError(f"Invalid GitHub repository URL: {repo_url}")
repo_name = f"{parts[0]}/{parts[1]}"
g = Github(gh_token) if gh_token else Github()
try:
repo = g.get_repo(repo_name)
except GithubException as e:
raise ValueError(f"Unable to access repository {repo_name}: {e}")
all_content = []
if "repo" in content_types:
all_content.append(f"Repository: {repo.full_name}")
all_content.append(f"Description: {repo.description or 'No description'}")
all_content.append(f"Language: {repo.language or 'Not specified'}")
all_content.append(f"Stars: {repo.stargazers_count}")
all_content.append(f"Forks: {repo.forks_count}")
all_content.append("")
if "code" in content_types:
try:
readme = repo.get_readme()
all_content.append("README:")
all_content.append(
readme.decoded_content.decode("utf-8", errors="ignore")
)
all_content.append("")
except GithubException:
pass
try:
contents = repo.get_contents("")
if isinstance(contents, list):
all_content.append("Repository structure:")
for content_file in contents[:20]:
all_content.append(
f"- {content_file.path} ({content_file.type})"
)
all_content.append("")
except GithubException:
pass
if "pr" in content_types:
prs = repo.get_pulls(state="open")
pr_list = list(prs[:5])
if pr_list:
all_content.append("Recent Pull Requests:")
for pr in pr_list:
all_content.append(f"- PR #{pr.number}: {pr.title}")
if pr.body:
body_preview = pr.body[:200].replace("\n", " ")
all_content.append(f" {body_preview}")
all_content.append("")
if "issue" in content_types:
issues = repo.get_issues(state="open")
issue_list = [i for i in list(issues[:10]) if not i.pull_request][:5]
if issue_list:
all_content.append("Recent Issues:")
for issue in issue_list:
all_content.append(f"- Issue #{issue.number}: {issue.title}")
if issue.body:
body_preview = issue.body[:200].replace("\n", " ")
all_content.append(f" {body_preview}")
all_content.append("")
if not all_content:
raise ValueError(f"No content could be loaded from repository: {repo_url}")
content = "\n".join(all_content)
return LoaderResult(
content=content,
metadata={
"source": repo_url,
"repo": repo_name,
"content_types": content_types,
},
doc_id=self.generate_doc_id(source_ref=repo_url, content=content),
)

View File

@@ -1,78 +0,0 @@
import json
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class JSONLoader(BaseLoader):
def load(self, source_content: SourceContent, **kwargs) -> LoaderResult:
source_ref = source_content.source_ref
content = source_content.source
if source_content.is_url():
content = self._load_from_url(source_ref, kwargs)
elif source_content.path_exists():
content = self._load_from_file(source_ref)
return self._parse_json(content, source_ref)
def _load_from_url(self, url: str, kwargs: dict) -> str:
import requests
headers = kwargs.get(
"headers",
{
"Accept": "application/json",
"User-Agent": "Mozilla/5.0 (compatible; crewai-tools JSONLoader)",
},
)
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
return (
response.text
if not self._is_json_response(response)
else json.dumps(response.json(), indent=2)
)
except Exception as e:
raise ValueError(f"Error fetching JSON from URL {url}: {e!s}")
def _is_json_response(self, response) -> bool:
try:
response.json()
return True
except ValueError:
return False
def _load_from_file(self, path: str) -> str:
with open(path, "r", encoding="utf-8") as file:
return file.read()
def _parse_json(self, content: str, source_ref: str) -> LoaderResult:
try:
data = json.loads(content)
if isinstance(data, dict):
text = "\n".join(
f"{k}: {json.dumps(v, indent=0)}" for k, v in data.items()
)
elif isinstance(data, list):
text = "\n".join(json.dumps(item, indent=0) for item in data)
else:
text = json.dumps(data, indent=0)
metadata = {
"format": "json",
"type": type(data).__name__,
"size": len(data) if isinstance(data, (list, dict)) else 1,
}
except json.JSONDecodeError as e:
text = content
metadata = {"format": "json", "parse_error": str(e)}
return LoaderResult(
content=text,
source=source_ref,
metadata=metadata,
doc_id=self.generate_doc_id(source_ref=source_ref, content=text),
)

View File

@@ -1,67 +0,0 @@
import re
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class MDXLoader(BaseLoader):
def load(self, source_content: SourceContent, **kwargs) -> LoaderResult:
source_ref = source_content.source_ref
content = source_content.source
if source_content.is_url():
content = self._load_from_url(source_ref, kwargs)
elif source_content.path_exists():
content = self._load_from_file(source_ref)
return self._parse_mdx(content, source_ref)
def _load_from_url(self, url: str, kwargs: dict) -> str:
import requests
headers = kwargs.get(
"headers",
{
"Accept": "text/markdown, text/x-markdown, text/plain",
"User-Agent": "Mozilla/5.0 (compatible; crewai-tools MDXLoader)",
},
)
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
return response.text
except Exception as e:
raise ValueError(f"Error fetching MDX from URL {url}: {e!s}")
def _load_from_file(self, path: str) -> str:
with open(path, "r", encoding="utf-8") as file:
return file.read()
def _parse_mdx(self, content: str, source_ref: str) -> LoaderResult:
cleaned_content = content
# Remove import statements
cleaned_content = re.sub(
r"^import\s+.*?\n", "", cleaned_content, flags=re.MULTILINE
)
# Remove export statements
cleaned_content = re.sub(
r"^export\s+.*?(?:\n|$)", "", cleaned_content, flags=re.MULTILINE
)
# Remove JSX tags (simple approach)
cleaned_content = re.sub(r"<[^>]+>", "", cleaned_content)
# Clean up extra whitespace
cleaned_content = re.sub(r"\n\s*\n\s*\n", "\n\n", cleaned_content)
cleaned_content = cleaned_content.strip()
metadata = {"format": "mdx"}
return LoaderResult(
content=cleaned_content,
source=source_ref,
metadata=metadata,
doc_id=self.generate_doc_id(source_ref=source_ref, content=cleaned_content),
)

View File

@@ -1,100 +0,0 @@
"""MySQL database loader."""
from urllib.parse import urlparse
import pymysql
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class MySQLLoader(BaseLoader):
"""Loader for MySQL database content."""
def load(self, source: SourceContent, **kwargs) -> LoaderResult:
"""Load content from a MySQL database table.
Args:
source: SQL query (e.g., "SELECT * FROM table_name")
**kwargs: Additional arguments including db_uri
Returns:
LoaderResult with database content
"""
metadata = kwargs.get("metadata", {})
db_uri = metadata.get("db_uri")
if not db_uri:
raise ValueError("Database URI is required for MySQL loader")
query = source.source
parsed = urlparse(db_uri)
if parsed.scheme not in ["mysql", "mysql+pymysql"]:
raise ValueError(f"Invalid MySQL URI scheme: {parsed.scheme}")
connection_params = {
"host": parsed.hostname or "localhost",
"port": parsed.port or 3306,
"user": parsed.username,
"password": parsed.password,
"database": parsed.path.lstrip("/") if parsed.path else None,
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
if not connection_params["database"]:
raise ValueError("Database name is required in the URI")
try:
connection = pymysql.connect(**connection_params)
try:
with connection.cursor() as cursor:
cursor.execute(query)
rows = cursor.fetchall()
if not rows:
content = "No data found in the table"
return LoaderResult(
content=content,
metadata={"source": query, "row_count": 0},
doc_id=self.generate_doc_id(
source_ref=query, content=content
),
)
text_parts = []
columns = list(rows[0].keys())
text_parts.append(f"Columns: {', '.join(columns)}")
text_parts.append(f"Total rows: {len(rows)}")
text_parts.append("")
for i, row in enumerate(rows, 1):
text_parts.append(f"Row {i}:")
for col, val in row.items():
if val is not None:
text_parts.append(f" {col}: {val}")
text_parts.append("")
content = "\n".join(text_parts)
if len(content) > 100000:
content = content[:100000] + "\n\n[Content truncated...]"
return LoaderResult(
content=content,
metadata={
"source": query,
"database": connection_params["database"],
"row_count": len(rows),
"columns": columns,
},
doc_id=self.generate_doc_id(source_ref=query, content=content),
)
finally:
connection.close()
except pymysql.Error as e:
raise ValueError(f"MySQL database error: {e}")
except Exception as e:
raise ValueError(f"Failed to load data from MySQL: {e}")

View File

@@ -1,71 +0,0 @@
"""PDF loader for extracting text from PDF files."""
import os
from pathlib import Path
from typing import Any
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class PDFLoader(BaseLoader):
"""Loader for PDF files."""
def load(self, source: SourceContent, **kwargs) -> LoaderResult:
"""Load and extract text from a PDF file.
Args:
source: The source content containing the PDF file path
Returns:
LoaderResult with extracted text content
Raises:
FileNotFoundError: If the PDF file doesn't exist
ImportError: If required PDF libraries aren't installed
"""
try:
import pypdf
except ImportError:
try:
import PyPDF2 as pypdf
except ImportError:
raise ImportError(
"PDF support requires pypdf or PyPDF2. Install with: uv add pypdf"
)
file_path = source.source
if not os.path.isfile(file_path):
raise FileNotFoundError(f"PDF file not found: {file_path}")
text_content = []
metadata: dict[str, Any] = {
"source": str(file_path),
"file_name": Path(file_path).name,
"file_type": "pdf",
}
try:
with open(file_path, "rb") as file:
pdf_reader = pypdf.PdfReader(file)
metadata["num_pages"] = len(pdf_reader.pages)
for page_num, page in enumerate(pdf_reader.pages, 1):
page_text = page.extract_text()
if page_text.strip():
text_content.append(f"Page {page_num}:\n{page_text}")
except Exception as e:
raise ValueError(f"Error reading PDF file {file_path}: {e!s}")
if not text_content:
content = f"[PDF file with no extractable text: {Path(file_path).name}]"
else:
content = "\n\n".join(text_content)
return LoaderResult(
content=content,
source=str(file_path),
metadata=metadata,
doc_id=self.generate_doc_id(source_ref=str(file_path), content=content),
)

View File

@@ -1,100 +0,0 @@
"""PostgreSQL database loader."""
from urllib.parse import urlparse
import psycopg2
from psycopg2.extras import RealDictCursor
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class PostgresLoader(BaseLoader):
"""Loader for PostgreSQL database content."""
def load(self, source: SourceContent, **kwargs) -> LoaderResult:
"""Load content from a PostgreSQL database table.
Args:
source: SQL query (e.g., "SELECT * FROM table_name")
**kwargs: Additional arguments including db_uri
Returns:
LoaderResult with database content
"""
metadata = kwargs.get("metadata", {})
db_uri = metadata.get("db_uri")
if not db_uri:
raise ValueError("Database URI is required for PostgreSQL loader")
query = source.source
parsed = urlparse(db_uri)
if parsed.scheme not in ["postgresql", "postgres", "postgresql+psycopg2"]:
raise ValueError(f"Invalid PostgreSQL URI scheme: {parsed.scheme}")
connection_params = {
"host": parsed.hostname or "localhost",
"port": parsed.port or 5432,
"user": parsed.username,
"password": parsed.password,
"database": parsed.path.lstrip("/") if parsed.path else None,
"cursor_factory": RealDictCursor,
}
if not connection_params["database"]:
raise ValueError("Database name is required in the URI")
try:
connection = psycopg2.connect(**connection_params)
try:
with connection.cursor() as cursor:
cursor.execute(query)
rows = cursor.fetchall()
if not rows:
content = "No data found in the table"
return LoaderResult(
content=content,
metadata={"source": query, "row_count": 0},
doc_id=self.generate_doc_id(
source_ref=query, content=content
),
)
text_parts = []
columns = list(rows[0].keys())
text_parts.append(f"Columns: {', '.join(columns)}")
text_parts.append(f"Total rows: {len(rows)}")
text_parts.append("")
for i, row in enumerate(rows, 1):
text_parts.append(f"Row {i}:")
for col, val in row.items():
if val is not None:
text_parts.append(f" {col}: {val}")
text_parts.append("")
content = "\n".join(text_parts)
if len(content) > 100000:
content = content[:100000] + "\n\n[Content truncated...]"
return LoaderResult(
content=content,
metadata={
"source": query,
"database": connection_params["database"],
"row_count": len(rows),
"columns": columns,
},
doc_id=self.generate_doc_id(source_ref=query, content=content),
)
finally:
connection.close()
except psycopg2.Error as e:
raise ValueError(f"PostgreSQL database error: {e}")
except Exception as e:
raise ValueError(f"Failed to load data from PostgreSQL: {e}")

View File

@@ -1,28 +0,0 @@
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class TextFileLoader(BaseLoader):
def load(self, source_content: SourceContent, **kwargs) -> LoaderResult:
source_ref = source_content.source_ref
if not source_content.path_exists():
raise FileNotFoundError(f"The following file does not exist: {source_content.source}")
with open(source_content.source, "r", encoding="utf-8") as file:
content = file.read()
return LoaderResult(
content=content,
source=source_ref,
doc_id=self.generate_doc_id(source_ref=source_ref, content=content)
)
class TextLoader(BaseLoader):
def load(self, source_content: SourceContent, **kwargs) -> LoaderResult:
return LoaderResult(
content=source_content.source,
source=source_content.source_ref,
doc_id=self.generate_doc_id(content=source_content.source)
)

View File

@@ -1,54 +0,0 @@
import re
import requests
from bs4 import BeautifulSoup
from crewai_tools.rag.base_loader import BaseLoader, LoaderResult
from crewai_tools.rag.source_content import SourceContent
class WebPageLoader(BaseLoader):
def load(self, source_content: SourceContent, **kwargs) -> LoaderResult:
url = source_content.source
headers = kwargs.get(
"headers",
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Language": "en-US,en;q=0.9",
},
)
try:
response = requests.get(url, timeout=15, headers=headers)
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, "html.parser")
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text(" ")
text = re.sub("[ \t]+", " ", text)
text = re.sub("\\s+\n\\s+", "\n", text)
text = text.strip()
title = (
soup.title.string.strip() if soup.title and soup.title.string else ""
)
metadata = {
"url": url,
"title": title,
"status_code": response.status_code,
"content_type": response.headers.get("content-type", ""),
}
return LoaderResult(
content=text,
source=url,
metadata=metadata,
doc_id=self.generate_doc_id(source_ref=url, content=text),
)
except Exception as e:
raise ValueError(f"Error loading webpage {url}: {e!s}")

Some files were not shown because too many files have changed in this diff Show More