Compare commits

...

23 Commits

Author SHA1 Message Date
Devin AI
ace1f48cf2 fix: restore typing imports to resolve F821 undefined name errors
- Add back Dict, List, Tuple, Type imports that are used throughout the code
- Consolidate duplicate imports in llm.py
- Match existing codebase patterns for type annotations
- Maintain quota limit handling functionality while fixing CI issues

Co-Authored-By: João <joao@crewai.com>
2025-09-02 16:54:27 +00:00
Devin AI
9af8e64205 fix: resolve lint issues with deprecated typing imports
- Add missing type imports (Dict, List, Type, DefaultDict, Tuple) to fix F821 undefined name errors
- Consolidate typing imports to avoid duplicate imports
- Remove unused imports to fix F401 and F811 errors
- Maintain quota limit handling functionality while fixing CI lint failures

Co-Authored-By: João <joao@crewai.com>
2025-09-02 16:52:10 +00:00
Devin AI
c763457e8d feat: Add graceful quota limit handling for LLM APIs
- Create LLMQuotaLimitExceededException following CrewAI's existing pattern
- Add quota limit error handling in both streaming and non-streaming LLM calls
- Update error handling in agent execution and crew agent executor
- Add comprehensive tests for quota limit scenarios
- Fixes issue #3434: Handle RateLimitError gracefully instead of crashing

The implementation catches litellm.exceptions.RateLimitError and converts it
to a CrewAI-specific exception, allowing tasks to detect quota limits and
shut down gracefully instead of crashing with unhandled exceptions.

Co-Authored-By: João <joao@crewai.com>
2025-09-02 16:45:35 +00:00
Greyson LaLonde
92d71f7f06 chore: migrate CI workflows to uv and update dev tooling (#3426)
chore(dev): update tooling & CI workflows

- Upgrade ruff, mypy (strict), pre-commit; add hooks, stubs, config consolidation
- Add bandit to dev deps and update uv.lock
- Enhance ruff rules (modern Python style, B006 for mutable defaults)
- Update workflows to use uv, matrix strategy, and changed-file type checking
- Include tests in type checking; fix job names and add summary job for branch protection
2025-09-02 12:35:02 -04:00
ZhangYier
dada9f140f fix: README.md example link 404 (#3432)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-09-02 10:29:40 -04:00
Greyson LaLonde
878c1a649a refactor: Move events module to crewai.events (#3425)
refactor(events): relocate events module & update imports

- Move events from utilities/ to top-level events/ with types/, listeners/, utils/ structure
- Update all source/tests/docs to new import paths
- Add backwards compatibility stubs in crewai.utilities.events with deprecation warnings
- Restore test mocks and fix related test imports
2025-09-02 10:06:42 -04:00
Greyson LaLonde
1b1a8fdbf4 fix: replace mutable default arguments with None (#3429)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-31 18:57:45 -04:00
Lorenze Jay
2633b33afc fix: enhance LLM event handling with task and agent metadata (#3422)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* fix: enhance LLM event handling with task and agent metadata

- Added `from_task` and `from_agent` parameters to LLM event emissions for improved traceability.
- Updated `_send_events_to_backend` method in TraceBatchManager to return status codes for better error handling.
- Modified `CREWAI_BASE_URL` to remove trailing slash for consistency.
- Improved logging and graceful failure handling in event sending process.

* drop print
2025-08-29 13:48:49 -07:00
Greyson LaLonde
e4c4b81e63 chore: refactor parser & constants, improve tools_handler, update tests
- Move parser constants to dedicated module with pre-compiled regex
- Refactor CrewAgentParser to module functions; remove unused params
- Improve tools_handler with instance attributes
- Update tests to use module-level parser functions
2025-08-29 14:35:08 -04:00
Greyson LaLonde
ec1eff02a8 fix: achieve parity between rag package and current impl (#3418)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Sanitize ChromaDB collection names and use original dir naming
- Add persistent client with file locking to the ChromaDB factory
- Add upsert support to the ChromaDB client
- Suppress ChromaDB deprecation warnings for `model_fields`
- Extract `suppress_logging` into shared `logger_utils`
- Update tests to reflect upsert behavior
- Docs: add additional note
2025-08-28 11:22:36 -04:00
Lorenze Jay
0f1b764c3e chore: update crewAI version and dependencies to 0.175.0 and tools to 0.65.0 (#3417)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Bump crewAI version from 0.165.1 to 0.175.0 in __init__.py.
* Update tools dependency from 0.62.1 to 0.65.0 in pyproject.toml and uv.lock files.
* Reflect changes in CLI templates for crew, flow, and tool configurations.
2025-08-27 19:33:32 -07:00
Lorenze Jay
6ee9db1d4a fix: enhance PlusAPI and TraceBatchManager with timeout handling and graceful failure logging (#3416)
* Added timeout parameters to PlusAPI trace event methods for improved reliability.
* Updated TraceBatchManager to handle None responses gracefully, logging warnings instead of errors.
* Improved logging messages to provide clearer context during trace batch initialization and event sending failures.
2025-08-27 18:43:03 -07:00
Greyson LaLonde
109de91d08 fix: batch entity memory items to reduce redundant operations (#3409)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* fix: batch save entity memory items to reduce redundant operations

* test: update memory event count after entity batch save implementation
2025-08-27 10:47:20 -04:00
Erika Shorten
92b70e652d Add hybrid search alpha parameter to the docs (#3397)
Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-08-27 10:36:39 -04:00
Heitor Carvalho
fc3f2c49d2 chore: remove auth0 and the need of typing the email on 'crewai login' (#3408)
* Remove the need of typing the email on 'crewai login'

* Remove auth0 constants, update tests
2025-08-27 10:12:57 -04:00
Lucas Gomide
88d2968fd5 chore: add deprecation notices to Task.max_retries (#3379)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-26 17:24:58 -04:00
Lorenze Jay
7addda9398 Lorenze/better tracing events (#3382)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* feat: implement tool usage limit exception handling

- Introduced `ToolUsageLimitExceeded` exception to manage maximum usage limits for tools.
- Enhanced `CrewStructuredTool` to check and raise this exception when the usage limit is reached.
- Updated `_run` and `_execute` methods to include usage limit checks and handle exceptions appropriately, improving reliability and user feedback.

* feat: enhance PlusAPI and ToolUsage with task metadata

- Removed the `send_trace_batch` method from PlusAPI to streamline the API.
- Added timeout parameters to trace event methods in PlusAPI for improved reliability.
- Updated ToolUsage to include task metadata (task name and ID) in event emissions, enhancing traceability and context during tool usage.
- Refactored event handling in LLM and ToolUsage events to ensure task information is consistently captured.

* feat: enhance memory and event handling with task and agent metadata

- Added task and agent metadata to various memory and event classes, improving traceability and context during memory operations.
- Updated the `ContextualMemory` and `Memory` classes to associate tasks and agents, allowing for better context management.
- Enhanced event emissions in `LLM`, `ToolUsage`, and memory events to include task and agent information, facilitating improved debugging and monitoring.
- Refactored event handling to ensure consistent capture of task and agent details across the system.

* drop

* refactor: clean up unused imports in memory and event modules

- Removed unused TYPE_CHECKING imports from long_term_memory.py to streamline the code.
- Eliminated unnecessary import from memory_events.py, enhancing clarity and maintainability.

* fix memory tests

* fix task_completed payload

* fix: remove unused test agent variable in external memory tests

* refactor: remove unused agent parameter from Memory class save method

- Eliminated the agent parameter from the save method in the Memory class to streamline the code and improve clarity.
- Updated the TraceBatchManager class by moving initialization of attributes into the constructor for better organization and readability.

* refactor: enhance ExecutionState and ReasoningEvent classes with optional task and agent identifiers

- Added optional `current_agent_id` and `current_task_id` attributes to the `ExecutionState` class for better tracking of agent and task states.
- Updated the `from_task` attribute in the `ReasoningEvent` class to use `Optional[Any]` instead of a specific type, improving flexibility in event handling.

* refactor: update ExecutionState class by removing unused agent and task identifiers

- Removed the `current_agent_id` and `current_task_id` attributes from the `ExecutionState` class to simplify the code and enhance clarity.
- Adjusted the import statements to include `Optional` for better type handling.

* refactor: streamline LLM event handling in LiteAgent

- Removed unused LLM event emissions (LLMCallStartedEvent, LLMCallCompletedEvent, LLMCallFailedEvent) from the LiteAgent class to simplify the code and improve performance.
- Adjusted the flow of LLM response handling by eliminating unnecessary event bus interactions, enhancing clarity and maintainability.

* flow ownership and not emitting events when a crew is done

* refactor: remove unused agent parameter from ShortTermMemory save method

- Eliminated the agent parameter from the save method in the ShortTermMemory class to streamline the code and improve clarity.
- This change enhances the maintainability of the memory management system by reducing unnecessary complexity.

* runtype check fix

* fixing tests

* fix lints

* fix: update event assertions in test_llm_emits_event_with_lite_agent

- Adjusted the expected counts for completed and started events in the test to reflect the correct behavior of the LiteAgent.
- Updated assertions for agent roles and IDs to match the expected values after recent changes in event handling.

* fix: update task name assertions in event tests

- Modified assertions in `test_stream_llm_emits_event_with_task_and_agent_info` and `test_llm_emits_event_with_task_and_agent_info` to use `task.description` as a fallback for `task.name`. This ensures that the tests correctly validate the task name even when it is not explicitly set.

* fix: update test assertions for output values and improve readability

- Updated assertions in `test_output_json_dict_hierarchical` to reflect the correct expected score value.
- Enhanced readability of assertions in `test_output_pydantic_to_another_task` and `test_key` by formatting the error messages for clarity.
- These changes ensure that the tests accurately validate the expected outputs and improve overall code quality.

* test fixes

* fix crew_test

* added another fixture

* fix: ensure agent and task assignments in contextual memory are conditional

- Updated the ContextualMemory class to check for the existence of short-term, long-term, external, and extended memory before assigning agent and task attributes. This prevents potential attribute errors when memory types are not initialized.
2025-08-26 09:09:46 -07:00
Greyson LaLonde
4b4a119a9f refactor: simplify rag client initialization (#3401)
* Simplified Qdrant and ChromaDB client initialization
* Refactored factory structure and updated tests accordingly
2025-08-26 08:54:51 -04:00
Greyson LaLonde
869bb115c8 Qdrant RAG Provider Support (#3400)
* Added Qdrant provider support with factory, config, and protocols
* Improved default embeddings and type definitions
* Fixed ChromaDB factory embedding assignment
2025-08-26 08:44:02 -04:00
Greyson LaLonde
7ac482c7c9 feat: rag configuration with optional dependency support (#3394)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
### RAG Config System

* Added ChromaDB client creation via config with sensible defaults
* Introduced optional imports and shared RAG config utilities/schema
* Enabled embedding function support with ChromaDB provider integration
* Refactored configs for immutability and stronger type safety
* Removed unused code and expanded test coverage
2025-08-26 00:00:22 -04:00
Greyson LaLonde
2e4bd3f49d feat: qdrant generic client (#3377)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
### Qdrant Client

* Add core client with collection, search, and document APIs (sync + async)
* Refactor utilities, types, and vector params (default 384-dim)
* Improve error handling with `ClientMethodMismatchError`
* Add score normalization, async embeddings, and optional `qdrant-client` dep
* Expand tests and type safety throughout
2025-08-25 16:02:25 -04:00
Greyson LaLonde
c02997d956 Add import utilities for optional dependencies (#3389)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-24 22:57:44 -04:00
Heitor Carvalho
f96b779df5 feat: reset tokens on crewai config reset (#3365)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-22 16:16:42 -04:00
179 changed files with 10218 additions and 2042 deletions

View File

@@ -15,8 +15,19 @@ jobs:
- name: Fetch Target Branch
run: git fetch origin $TARGET_BRANCH --depth=1
- name: Install Ruff
run: pip install ruff
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true
cache-dependency-glob: |
**/pyproject.toml
**/uv.lock
- name: Set up Python
run: uv python install 3.11
- name: Install dependencies
run: uv sync --dev --no-install-project
- name: Get Changed Python Files
id: changed-files
@@ -33,4 +44,4 @@ jobs:
echo "${{ steps.changed-files.outputs.files }}" \
| tr ' ' '\n' \
| grep -v 'src/crewai/cli/templates/' \
| xargs -I{} ruff check "{}"
| xargs -I{} uv run ruff check "{}"

View File

@@ -10,14 +10,20 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
python-version: "3.11.9"
enable-cache: true
cache-dependency-glob: |
**/pyproject.toml
**/uv.lock
- name: Set up Python
run: uv python install 3.11
- name: Install dependencies
run: pip install bandit
run: uv sync --dev --no-install-project
- name: Run Bandit
run: bandit -c pyproject.toml -r src/ -ll
run: uv run bandit -c pyproject.toml -r src/ -ll

View File

@@ -24,7 +24,7 @@ jobs:
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v3
uses: astral-sh/setup-uv@v6
with:
enable-cache: true
cache-dependency-glob: |

View File

@@ -6,21 +6,78 @@ permissions:
contents: write
jobs:
type-checker:
type-checker-matrix:
name: type-checker (${{ matrix.python-version }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.11.9"
fetch-depth: 0 # Fetch all history for proper diff
- name: Install Requirements
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true
cache-dependency-glob: |
**/pyproject.toml
**/uv.lock
- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}
- name: Install dependencies
run: uv sync --dev --no-install-project
- name: Get changed Python files
id: changed-files
run: |
pip install mypy
# Get the list of changed Python files compared to the base branch
echo "Fetching changed files..."
git diff --name-only --diff-filter=ACMRT origin/${{ github.base_ref }}...HEAD -- '*.py' > changed_files.txt
# Filter for files in src/ and tests/ directories
grep -E "^(src/|tests/)" changed_files.txt > filtered_changed_files.txt || true
# Check if there are any changed files
if [ -s filtered_changed_files.txt ]; then
echo "Changed Python files in src/ and tests/:"
cat filtered_changed_files.txt
echo "has_changes=true" >> $GITHUB_OUTPUT
# Convert newlines to spaces for mypy command
echo "files=$(cat filtered_changed_files.txt | tr '\n' ' ')" >> $GITHUB_OUTPUT
else
echo "No Python files changed in src/ or tests/"
echo "has_changes=false" >> $GITHUB_OUTPUT
fi
- name: Run type checks
run: mypy src
- name: Run type checks on changed files
if: steps.changed-files.outputs.has_changes == 'true'
run: |
echo "Running mypy on changed files with Python ${{ matrix.python-version }}..."
uv run mypy ${{ steps.changed-files.outputs.files }}
- name: No files to check
if: steps.changed-files.outputs.has_changes == 'false'
run: echo "No Python files in src/ or tests/ were modified - skipping type checks"
# Summary job to provide single status for branch protection
type-checker:
name: type-checker
runs-on: ubuntu-latest
needs: type-checker-matrix
if: always()
steps:
- name: Check matrix results
run: |
if [ "${{ needs.type-checker-matrix.result }}" == "success" ] || [ "${{ needs.type-checker-matrix.result }}" == "skipped" ]; then
echo "✅ All type checks passed"
else
echo "❌ Type checks failed"
exit 1
fi

View File

@@ -1,7 +1,14 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.2
rev: v0.12.11
hooks:
- id: ruff
args: ["--fix"]
args: ["--config", "pyproject.toml"]
- id: ruff-format
args: ["--config", "pyproject.toml"]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.17.1
hooks:
- id: mypy
args: ["--config-file", "pyproject.toml"]

View File

@@ -1,4 +0,0 @@
exclude = [
"templates",
"__init__.py",
]

View File

@@ -418,10 +418,10 @@ Choose CrewAI to easily build powerful, adaptable, and production-ready AI autom
You can test different real life examples of AI crews in the [CrewAI-examples repo](https://github.com/crewAIInc/crewAI-examples?tab=readme-ov-file):
- [Landing Page Generator](https://github.com/crewAIInc/crewAI-examples/tree/main/landing_page_generator)
- [Landing Page Generator](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/landing_page_generator)
- [Having Human input on the execution](https://docs.crewai.com/how-to/Human-Input-on-Execution)
- [Trip Planner](https://github.com/crewAIInc/crewAI-examples/tree/main/trip_planner)
- [Stock Analysis](https://github.com/crewAIInc/crewAI-examples/tree/main/stock_analysis)
- [Trip Planner](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/trip_planner)
- [Stock Analysis](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/stock_analysis)
### Quick Tutorial
@@ -429,19 +429,19 @@ You can test different real life examples of AI crews in the [CrewAI-examples re
### Write Job Descriptions
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/job-posting) or watch a video below:
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/job-posting) or watch a video below:
[![Jobs postings](https://img.youtube.com/vi/u98wEMz-9to/maxresdefault.jpg)](https://www.youtube.com/watch?v=u98wEMz-9to "Jobs postings")
### Trip Planner
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/trip_planner) or watch a video below:
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/trip_planner) or watch a video below:
[![Trip Planner](https://img.youtube.com/vi/xis7rWp-hjs/maxresdefault.jpg)](https://www.youtube.com/watch?v=xis7rWp-hjs "Trip Planner")
### Stock Analysis
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/stock_analysis) or watch a video below:
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/stock_analysis) or watch a video below:
[![Stock Analysis](https://img.youtube.com/vi/e0Uj4yWdaAg/maxresdefault.jpg)](https://www.youtube.com/watch?v=e0Uj4yWdaAg "Stock Analysis")

View File

@@ -44,12 +44,12 @@ To create a custom event listener, you need to:
Here's a simple example of a custom event listener class:
```python
from crewai.utilities.events import (
from crewai.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def __init__(self):
@@ -146,7 +146,7 @@ my_project/
```python
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
# ... import events ...
class MyCustomListener(BaseEventListener):
@@ -279,7 +279,7 @@ Additional fields vary by event type. For example, `CrewKickoffCompletedEvent` i
For temporary event handling (useful for testing or specific operations), you can use the `scoped_handlers` context manager:
```python
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
from crewai.events import crewai_event_bus, CrewKickoffStartedEvent
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)

View File

@@ -681,11 +681,11 @@ CrewAI emits events during the knowledge retrieval process that you can listen f
#### Example: Monitoring Knowledge Retrieval
```python
from crewai.utilities.events import (
from crewai.events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
BaseEventListener,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class KnowledgeMonitorListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -733,10 +733,10 @@ CrewAI supports streaming responses from LLMs, allowing your application to rece
CrewAI emits events for each chunk received during streaming:
```python
from crewai.utilities.events import (
from crewai.events import (
LLMStreamChunkEvent
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
@@ -758,8 +758,8 @@ CrewAI supports streaming responses from LLMs, allowing your application to rece
```python
from crewai import LLM, Agent, Task, Crew
from crewai.utilities.events import LLMStreamChunkEvent
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import LLMStreamChunkEvent
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -1041,8 +1041,8 @@ CrewAI emits the following memory-related events:
Track memory operation timing to optimize your application:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent
)
@@ -1076,8 +1076,8 @@ memory_monitor = MemoryPerformanceMonitor()
Log memory operations for debugging and insights:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemorySaveStartedEvent,
MemoryQueryStartedEvent,
MemoryRetrievalCompletedEvent
@@ -1117,8 +1117,8 @@ memory_logger = MemoryLogger()
Capture and respond to memory errors:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemorySaveFailedEvent,
MemoryQueryFailedEvent
)
@@ -1167,8 +1167,8 @@ error_tracker = MemoryErrorTracker(notify_email="admin@example.com")
Memory events can be forwarded to analytics and monitoring platforms to track performance metrics, detect anomalies, and visualize memory usage patterns:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent
)

View File

@@ -59,6 +59,7 @@ crew = Crew(
| **Output Pydantic** _(optional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | A Pydantic model for task output. |
| **Callback** _(optional)_ | `callback` | `Optional[Any]` | Function/object to be executed after task completion. |
| **Guardrail** _(optional)_ | `guardrail` | `Optional[Callable]` | Function to validate task output before proceeding to next task. |
| **Guardrail Max Retries** _(optional)_ | `guardrail_max_retries` | `Optional[int]` | Maximum number of retries when guardrail validation fails. Defaults to 3. |
## Creating Tasks
@@ -452,7 +453,7 @@ task = Task(
expected_output="A valid JSON object",
agent=analyst,
guardrail=validate_json_output,
max_retries=3 # Limit retry attempts
guardrail_max_retries=3 # Limit retry attempts
)
```

View File

@@ -1,13 +1,13 @@
---
title: Weaviate Vector Search
description: The `WeaviateVectorSearchTool` is designed to search a Weaviate vector database for semantically similar documents.
description: The `WeaviateVectorSearchTool` is designed to search a Weaviate vector database for semantically similar documents using hybrid search.
icon: network-wired
---
## Overview
The `WeaviateVectorSearchTool` is specifically crafted for conducting semantic searches within documents stored in a Weaviate vector database. This tool allows you to find semantically similar documents to a given query, leveraging the power of vector embeddings for more accurate and contextually relevant search results.
The `WeaviateVectorSearchTool` is specifically crafted for conducting semantic searches within documents stored in a Weaviate vector database. This tool allows you to find semantically similar documents to a given query, leveraging the power of vector and keyword search for more accurate and contextually relevant search results.
[Weaviate](https://weaviate.io/) is a vector database that stores and queries vector embeddings, enabling semantic search capabilities.
@@ -39,6 +39,7 @@ from crewai_tools import WeaviateVectorSearchTool
tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
alpha=0.75,
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
weaviate_api_key="your-weaviate-api-key",
)
@@ -63,6 +64,7 @@ The `WeaviateVectorSearchTool` accepts the following parameters:
- **weaviate_cluster_url**: Required. The URL of the Weaviate cluster.
- **weaviate_api_key**: Required. The API key for the Weaviate cluster.
- **limit**: Optional. The number of results to return. Default is `3`.
- **alpha**: Optional. Controls the weighting between vector and keyword (BM25) search. alpha = 0 -> BM25 only, alpha = 1 -> vector search only. Default is `0.75`.
- **vectorizer**: Optional. The vectorizer to use. If not provided, it will use `text2vec_openai` with the `nomic-embed-text` model.
- **generative_model**: Optional. The generative model to use. If not provided, it will use OpenAI's `gpt-4o`.
@@ -78,6 +80,7 @@ from weaviate.classes.config import Configure
tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
alpha=0.75,
vectorizer=Configure.Vectorizer.text2vec_openai(model="nomic-embed-text"),
generative_model=Configure.Generative.openai(model="gpt-4o-mini"),
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
@@ -128,6 +131,7 @@ with test_docs.batch.dynamic() as batch:
tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
alpha=0.75,
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
weaviate_api_key="your-weaviate-api-key",
)
@@ -145,6 +149,7 @@ from crewai_tools import WeaviateVectorSearchTool
weaviate_tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
alpha=0.75,
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
weaviate_api_key="your-weaviate-api-key",
)

View File

@@ -44,12 +44,12 @@ Prompt Tracing을 통해 다음과 같은 작업이 가능합니다:
아래는 커스텀 이벤트 리스너 클래스의 간단한 예시입니다:
```python
from crewai.utilities.events import (
from crewai.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def __init__(self):
@@ -146,7 +146,7 @@ my_project/
```python
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
# ... import events ...
class MyCustomListener(BaseEventListener):
@@ -279,7 +279,7 @@ CrewAI는 여러분이 청취할 수 있는 다양한 이벤트를 제공합니
임시 이벤트 처리가 필요한 경우(테스트 또는 특정 작업에 유용함), `scoped_handlers` 컨텍스트 관리자를 사용할 수 있습니다:
```python
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
from crewai.events import crewai_event_bus, CrewKickoffStartedEvent
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)

View File

@@ -683,11 +683,11 @@ CrewAI는 knowledge 검색 과정에서 이벤트를 발생시키며, 이벤트
#### 예시: Knowledge Retrieval 모니터링
```python
from crewai.utilities.events import (
from crewai.events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
BaseEventListener,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class KnowledgeMonitorListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -731,10 +731,10 @@ CrewAI는 LLM의 스트리밍 응답을 지원하여, 애플리케이션이 출
CrewAI는 스트리밍 중 수신되는 각 청크에 대해 이벤트를 발생시킵니다:
```python
from crewai.utilities.events import (
from crewai.events import (
LLMStreamChunkEvent
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
@@ -756,8 +756,8 @@ CrewAI는 LLM의 스트리밍 응답을 지원하여, 애플리케이션이 출
```python
from crewai import LLM, Agent, Task, Crew
from crewai.utilities.events import LLMStreamChunkEvent
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import LLMStreamChunkEvent
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -985,8 +985,8 @@ CrewAI는 다음과 같은 메모리 관련 이벤트를 발생시킵니다:
애플리케이션을 최적화하기 위해 메모리 작업 타이밍을 추적하세요:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent
)
@@ -1020,8 +1020,8 @@ memory_monitor = MemoryPerformanceMonitor()
디버깅 및 인사이트를 위해 메모리 작업을 로깅합니다:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemorySaveStartedEvent,
MemoryQueryStartedEvent,
MemoryRetrievalCompletedEvent
@@ -1061,8 +1061,8 @@ memory_logger = MemoryLogger()
메모리 오류를 캡처하고 대응합니다:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemorySaveFailedEvent,
MemoryQueryFailedEvent
)
@@ -1111,8 +1111,8 @@ error_tracker = MemoryErrorTracker(notify_email="admin@example.com")
메모리 이벤트는 분석 및 모니터링 플랫폼으로 전달되어 성능 지표를 추적하고, 이상 징후를 감지하며, 메모리 사용 패턴을 시각화할 수 있습니다:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent
)

View File

@@ -59,6 +59,7 @@ crew = Crew(
| **Pydantic 출력** _(선택 사항)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | 태스크 출력용 Pydantic 모델입니다. |
| **콜백** _(선택 사항)_ | `callback` | `Optional[Any]` | 태스크 완료 후 실행할 함수/객체입니다. |
| **가드레일** _(선택 사항)_ | `guardrail` | `Optional[Callable]` | 다음 태스크로 진행하기 전에 태스크 출력을 검증하는 함수입니다. |
| **가드레일 최대 재시도** _(선택 사항)_ | `guardrail_max_retries` | `Optional[int]` | 가드레일 검증 실패 시 최대 재시도 횟수입니다. 기본값은 3입니다. |
## 작업 생성하기
@@ -448,7 +449,7 @@ task = Task(
expected_output="A valid JSON object",
agent=analyst,
guardrail=validate_json_output,
max_retries=3 # Limit retry attempts
guardrail_max_retries=3 # 재시도 횟수 제한
)
```
@@ -899,4 +900,4 @@ except RuntimeError as e:
작업(task)은 CrewAI 에이전트의 행동을 이끄는 원동력입니다.
작업과 그 결과를 적절하게 정의함으로써, 에이전트가 독립적으로 또는 협업 단위로 효과적으로 작동할 수 있는 기반을 마련할 수 있습니다.
작업에 적합한 도구를 장착하고, 실행 과정을 이해하며, 견고한 검증 절차를 따르는 것은 CrewAI의 잠재력을 극대화하는 데 필수적입니다.
이를 통해 에이전트가 할당된 작업에 효과적으로 준비되고, 작업이 의도대로 수행될 수 있습니다.
이를 통해 에이전트가 할당된 작업에 효과적으로 준비되고, 작업이 의도대로 수행될 수 있습니다.

View File

@@ -44,12 +44,12 @@ Para criar um listener de evento personalizado, você precisa:
Veja um exemplo simples de uma classe de listener de evento personalizado:
```python
from crewai.utilities.events import (
from crewai.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MeuListenerPersonalizado(BaseEventListener):
def __init__(self):
@@ -146,7 +146,7 @@ my_project/
```python
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
# ... importe events ...
class MyCustomListener(BaseEventListener):
@@ -268,7 +268,7 @@ Campos adicionais variam pelo tipo de evento. Por exemplo, `CrewKickoffCompleted
Para lidar temporariamente com eventos (útil para testes ou operações específicas), você pode usar o context manager `scoped_handlers`:
```python
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
from crewai.events import crewai_event_bus, CrewKickoffStartedEvent
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)

View File

@@ -681,11 +681,11 @@ O CrewAI emite eventos durante o processo de recuperação de knowledge que voc
#### Exemplo: Monitorando Recuperação de Knowledge
```python
from crewai.utilities.events import (
from crewai.events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
BaseEventListener,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class KnowledgeMonitorListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -708,10 +708,10 @@ O CrewAI suporta respostas em streaming de LLMs, permitindo que sua aplicação
O CrewAI emite eventos para cada chunk recebido durante o streaming:
```python
from crewai.utilities.events import (
from crewai.events import (
LLMStreamChunkEvent
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -59,6 +59,7 @@ crew = Crew(
| **Output Pydantic** _(opcional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | Um modelo Pydantic para a saída da tarefa. |
| **Callback** _(opcional)_ | `callback` | `Optional[Any]` | Função/objeto a ser executado após a conclusão da tarefa. |
| **Guardrail** _(opcional)_ | `guardrail` | `Optional[Callable]` | Função para validar a saída da tarefa antes de prosseguir para a próxima tarefa. |
| **Max Tentativas Guardrail** _(opcional)_ | `guardrail_max_retries` | `Optional[int]` | Número máximo de tentativas quando a validação do guardrail falha. Padrão é 3. |
## Criando Tarefas
@@ -450,7 +451,7 @@ task = Task(
expected_output="Um objeto JSON válido",
agent=analyst,
guardrail=validate_json_output,
max_retries=3 # Limite de tentativas
guardrail_max_retries=3 # Limite de tentativas
)
```
@@ -935,7 +936,7 @@ task = Task(
description="Gerar dados",
expected_output="Dados válidos",
guardrail=validate_data,
max_retries=5 # Sobrescreve o limite padrão de tentativas
guardrail_max_retries=5 # Sobrescreve o limite padrão de tentativas
)
```

View File

@@ -48,7 +48,7 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools~=0.62.1"]
tools = ["crewai-tools~=0.65.0"]
embeddings = [
"tiktoken~=0.8.0"
]
@@ -68,12 +68,16 @@ docling = [
aisuite = [
"aisuite>=0.1.10",
]
qdrant = [
"qdrant-client[fastembed]>=1.14.3",
]
[tool.uv]
dev-dependencies = [
"ruff>=0.8.2",
"mypy>=1.10.0",
"pre-commit>=3.6.0",
"ruff>=0.12.11",
"mypy>=1.17.1",
"pre-commit>=4.3.0",
"bandit>=1.8.6",
"pillow>=10.2.0",
"cairosvg>=2.7.1",
"pytest>=8.0.0",
@@ -85,15 +89,43 @@ dev-dependencies = [
"pytest-timeout>=2.3.1",
"pytest-xdist>=3.6.1",
"pytest-split>=0.9.0",
"types-requests==2.32.*",
"types-pyyaml==6.0.*",
"types-regex==2024.11.6.*",
"types-appdirs==1.4.*",
]
[project.scripts]
crewai = "crewai.cli.cli:crewai"
[tool.ruff]
exclude = [
"src/crewai/cli/templates",
]
fix = true
[tool.ruff.lint]
select = [
"B006",
"UP006",
"UP007",
"UP035",
"UP037",
"UP040",
"UP004",
"UP008",
"UP010",
"UP018",
"UP031",
"UP032",
"UP034",
"I001",
"I002",
]
[tool.mypy]
ignore_missing_imports = true
disable_error_code = 'import-untyped'
exclude = ["cli/templates"]
strict = true
exclude = ["src/crewai/cli/templates"]
[tool.bandit]
exclude_dirs = ["src/crewai/cli/templates"]

View File

@@ -54,7 +54,7 @@ def _track_install_async():
_track_install_async()
__version__ = "0.165.1"
__version__ = "0.175.0"
__all__ = [
"Agent",
"Crew",

View File

@@ -3,16 +3,15 @@ import subprocess
import time
from typing import (
Any,
Callable,
Dict,
List,
Literal,
Optional,
Sequence,
Tuple,
Type,
Union,
)
from collections.abc import Callable, Sequence
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -38,17 +37,21 @@ from crewai.utilities.agent_utils import (
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.utilities.events.agent_events import (
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
from crewai.utilities.events.knowledge_events import (
from crewai.utilities.exceptions import (
LLMContextLengthExceededException,
LLMQuotaLimitExceededException,
)
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
@@ -320,15 +323,20 @@ class Agent(BaseAgent):
event=MemoryRetrievalStartedEvent(
task_id=str(task.id) if task else None,
source_type="agent",
from_agent=self,
from_task=task,
),
)
start_time = time.time()
contextual_memory = ContextualMemory(
self.crew._short_term_memory,
self.crew._long_term_memory,
self.crew._entity_memory,
self.crew._external_memory,
agent=self,
task=task,
)
memory = contextual_memory.build_context_for_task(task, context)
if memory.strip() != "":
@@ -341,6 +349,8 @@ class Agent(BaseAgent):
memory_content=memory,
retrieval_time_ms=(time.time() - start_time) * 1000,
source_type="agent",
from_agent=self,
from_task=task,
),
)
knowledge_config = (
@@ -454,6 +464,26 @@ class Agent(BaseAgent):
),
)
raise e
except LLMContextLengthExceededException as e:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
except LLMQuotaLimitExceededException as e:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors

View File

@@ -1,5 +1,5 @@
from .cache.cache_handler import CacheHandler
from .parser import CrewAgentParser
from .tools_handler import ToolsHandler
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.parser import parse, AgentAction, AgentFinish, OutputParserException
from crewai.agents.tools_handler import ToolsHandler
__all__ = ["CacheHandler", "CrewAgentParser", "ToolsHandler"]
__all__ = ["CacheHandler", "parse", "AgentAction", "AgentFinish", "OutputParserException", "ToolsHandler"]

View File

@@ -1,4 +1,4 @@
from typing import Any, AsyncIterable, Dict, List, Optional
from typing import Any, Dict, List, Optional
from pydantic import Field, PrivateAttr
@@ -14,15 +14,14 @@ from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool
from crewai.utilities import Logger
from crewai.utilities.converter import Converter
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.agent_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
try:
from langchain_core.messages import ToolMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent

View File

@@ -10,8 +10,8 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Logger
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.agent_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,

View File

@@ -7,7 +7,7 @@ from crewai.utilities import I18N
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.printer import Printer
from crewai.utilities.events.event_listener import event_listener
from crewai.events.event_listener import event_listener
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -43,7 +43,6 @@ class CrewAgentExecutorMixin:
metadata={
"observation": self.task.description,
},
agent=self.agent.role,
)
except Exception as e:
print(f"Failed to add to short term memory: {e}")
@@ -65,7 +64,6 @@ class CrewAgentExecutorMixin:
"description": self.task.description,
"messages": self.messages,
},
agent=self.agent.role,
)
except Exception as e:
print(f"Failed to add to external memory: {e}")
@@ -100,8 +98,8 @@ class CrewAgentExecutorMixin:
)
self.crew._long_term_memory.save(long_term_memory)
for entity in evaluation.entities:
entity_memory = EntityMemoryItem(
entity_memories = [
EntityMemoryItem(
name=entity.name,
type=entity.type,
description=entity.description,
@@ -109,7 +107,10 @@ class CrewAgentExecutorMixin:
[f"- {r}" for r in entity.relationships]
),
)
self.crew._entity_memory.save(entity_memory)
for entity in evaluation.entities
]
if entity_memories:
self.crew._entity_memory.save(entity_memories)
except AttributeError as e:
print(f"Missing attributes for long term memory: {e}")
pass
@@ -158,7 +159,9 @@ class CrewAgentExecutorMixin:
self._printer.print(content=prompt, color="bold_yellow")
response = input()
if response.strip() != "":
self._printer.print(content="\nProcessing your feedback...", color="cyan")
self._printer.print(
content="\nProcessing your feedback...", color="cyan"
)
return response
finally:
event_listener.formatter.resume_live_updates()

View File

@@ -0,0 +1,27 @@
"""Constants for agent-related modules."""
import re
from typing import Final
# crewai.agents.parser constants
FINAL_ANSWER_ACTION: Final[str] = "Final Answer:"
MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE: Final[str] = (
"I did it wrong. Invalid Format: I missed the 'Action:' after 'Thought:'. I will do right next, and don't use a tool I have already used.\n"
)
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE: Final[str] = (
"I did it wrong. Invalid Format: I missed the 'Action Input:' after 'Action:'. I will do right next, and don't use a tool I have already used.\n"
)
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE: Final[str] = (
"I did it wrong. Tried to both perform Action and give a Final Answer at the same time, I must do one or the other"
)
UNABLE_TO_REPAIR_JSON_RESULTS: Final[list[str]] = ['""', "{}"]
ACTION_INPUT_REGEX: Final[re.Pattern[str]] = re.compile(
r"Action\s*\d*\s*:\s*(.*?)\s*Action\s*\d*\s*Input\s*\d*\s*:\s*(.*)", re.DOTALL
)
ACTION_REGEX: Final[re.Pattern[str]] = re.compile(
r"Action\s*\d*\s*:\s*(.*?)", re.DOTALL
)
ACTION_INPUT_ONLY_REGEX: Final[re.Pattern[str]] = re.compile(
r"\s*Action\s*\d*\s*Input\s*\d*\s*:\s*(.*)", re.DOTALL
)

View File

@@ -1,4 +1,5 @@
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union
from collections.abc import Callable
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
@@ -30,11 +31,15 @@ from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.logger import Logger
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.events.agent_events import (
from crewai.events.types.logging_events import (
AgentLogsStartedEvent,
AgentLogsExecutionEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.events.event_bus import crewai_event_bus
from crewai.utilities.exceptions import (
LLMContextLengthExceededException,
LLMQuotaLimitExceededException,
)
class CrewAgentExecutor(CrewAgentExecutorMixin):
@@ -54,11 +59,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
tools_description: str,
tools_handler: ToolsHandler,
step_callback: Any = None,
original_tools: List[Any] = [],
original_tools: List[Any] | None = None,
function_calling_llm: Any = None,
respect_context_window: bool = False,
request_within_rpm_limit: Optional[Callable[[], bool]] = None,
callbacks: List[Any] = [],
callbacks: List[Any] | None = None,
):
self._i18n: I18N = I18N()
self.llm: BaseLLM = llm
@@ -70,10 +75,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.tools_names = tools_names
self.stop = stop_words
self.max_iter = max_iter
self.callbacks = callbacks
self.callbacks = callbacks or []
self._printer: Printer = Printer()
self.tools_handler = tools_handler
self.original_tools = original_tools
self.original_tools = original_tools or []
self.step_callback = step_callback
self.use_stop_words = self.llm.supports_stop_words()
self.tools_description = tools_description
@@ -122,7 +127,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
handle_unknown_error(self._printer, e)
raise
if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
@@ -156,7 +160,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task
from_task=self.task,
)
formatted_answer = process_llm_response(answer, self.use_stop_words)
@@ -202,6 +206,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
printer=self._printer,
)
except LLMContextLengthExceededException as e:
raise e
except LLMQuotaLimitExceededException as e:
raise e
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors

View File

@@ -1,50 +1,67 @@
import re
from typing import Any, Optional, Union
"""Agent output parsing module for ReAct-style LLM responses.
This module provides parsing functionality for agent outputs that follow
the ReAct (Reasoning and Acting) format, converting them into structured
AgentAction or AgentFinish objects.
"""
from dataclasses import dataclass
from json_repair import repair_json
from crewai.agents.constants import (
ACTION_INPUT_REGEX,
ACTION_REGEX,
ACTION_INPUT_ONLY_REGEX,
FINAL_ANSWER_ACTION,
MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE,
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
UNABLE_TO_REPAIR_JSON_RESULTS,
)
from crewai.utilities import I18N
FINAL_ANSWER_ACTION = "Final Answer:"
MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE = "I did it wrong. Invalid Format: I missed the 'Action:' after 'Thought:'. I will do right next, and don't use a tool I have already used.\n"
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE = "I did it wrong. Invalid Format: I missed the 'Action Input:' after 'Action:'. I will do right next, and don't use a tool I have already used.\n"
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE = "I did it wrong. Tried to both perform Action and give a Final Answer at the same time, I must do one or the other"
_I18N = I18N()
@dataclass
class AgentAction:
"""Represents an action to be taken by an agent."""
thought: str
tool: str
tool_input: str
text: str
result: str
def __init__(self, thought: str, tool: str, tool_input: str, text: str):
self.thought = thought
self.tool = tool
self.tool_input = tool_input
self.text = text
result: str | None = None
@dataclass
class AgentFinish:
"""Represents the final answer from an agent."""
thought: str
output: str
text: str
def __init__(self, thought: str, output: str, text: str):
self.thought = thought
self.output = output
self.text = text
class OutputParserException(Exception):
error: str
"""Exception raised when output parsing fails.
def __init__(self, error: str):
Attributes:
error: The error message.
"""
def __init__(self, error: str) -> None:
"""Initialize OutputParserException.
Args:
error: The error message.
"""
self.error = error
super().__init__(error)
class CrewAgentParser:
"""Parses ReAct-style LLM calls that have a single tool input.
def parse(text: str) -> AgentAction | AgentFinish:
"""Parse agent output text into AgentAction or AgentFinish.
Expects output to be in one of two formats.
@@ -62,108 +79,117 @@ class CrewAgentParser:
Thought: agent thought here
Final Answer: The temperature is 100 degrees
Args:
text: The agent output text to parse.
Returns:
AgentAction or AgentFinish based on the content.
Raises:
OutputParserException: If the text format is invalid.
"""
thought = _extract_thought(text)
includes_answer = FINAL_ANSWER_ACTION in text
action_match = ACTION_INPUT_REGEX.search(text)
_i18n: I18N = I18N()
agent: Any = None
if includes_answer:
final_answer = text.split(FINAL_ANSWER_ACTION)[-1].strip()
# Check whether the final answer ends with triple backticks.
if final_answer.endswith("```"):
# Count occurrences of triple backticks in the final answer.
count = final_answer.count("```")
# If count is odd then it's an unmatched trailing set; remove it.
if count % 2 != 0:
final_answer = final_answer[:-3].rstrip()
return AgentFinish(thought=thought, output=final_answer, text=text)
def __init__(self, agent: Optional[Any] = None):
self.agent = agent
elif action_match:
action = action_match.group(1)
clean_action = _clean_action(action)
@staticmethod
def parse_text(text: str) -> Union[AgentAction, AgentFinish]:
"""
Static method to parse text into an AgentAction or AgentFinish without needing to instantiate the class.
action_input = action_match.group(2).strip()
Args:
text: The text to parse.
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = _safe_repair_json(tool_input)
Returns:
Either an AgentAction or AgentFinish based on the parsed content.
"""
parser = CrewAgentParser()
return parser.parse(text)
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
thought = self._extract_thought(text)
includes_answer = FINAL_ANSWER_ACTION in text
regex = (
r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
return AgentAction(
thought=thought, tool=clean_action, tool_input=safe_tool_input, text=text
)
action_match = re.search(regex, text, re.DOTALL)
if includes_answer:
final_answer = text.split(FINAL_ANSWER_ACTION)[-1].strip()
# Check whether the final answer ends with triple backticks.
if final_answer.endswith("```"):
# Count occurrences of triple backticks in the final answer.
count = final_answer.count("```")
# If count is odd then it's an unmatched trailing set; remove it.
if count % 2 != 0:
final_answer = final_answer[:-3].rstrip()
return AgentFinish(thought, final_answer, text)
elif action_match:
action = action_match.group(1)
clean_action = self._clean_action(action)
if not ACTION_REGEX.search(text):
raise OutputParserException(
f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{_I18N.slice('final_answer_format')}",
)
elif not ACTION_INPUT_ONLY_REGEX.search(text):
raise OutputParserException(
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
)
else:
err_format = _I18N.slice("format_without_tools")
error = f"{err_format}"
raise OutputParserException(
error,
)
action_input = action_match.group(2).strip()
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = self._safe_repair_json(tool_input)
def _extract_thought(text: str) -> str:
"""Extract the thought portion from the text.
return AgentAction(thought, clean_action, safe_tool_input, text)
Args:
text: The full agent output text.
if not re.search(r"Action\s*\d*\s*:[\s]*(.*?)", text, re.DOTALL):
raise OutputParserException(
f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{self._i18n.slice('final_answer_format')}",
)
elif not re.search(
r"[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)", text, re.DOTALL
):
raise OutputParserException(
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
)
else:
format = self._i18n.slice("format_without_tools")
error = f"{format}"
raise OutputParserException(
error,
)
Returns:
The extracted thought string.
"""
thought_index = text.find("\nAction")
if thought_index == -1:
thought_index = text.find("\nFinal Answer")
if thought_index == -1:
return ""
thought = text[:thought_index].strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
def _extract_thought(self, text: str) -> str:
thought_index = text.find("\nAction")
if thought_index == -1:
thought_index = text.find("\nFinal Answer")
if thought_index == -1:
return ""
thought = text[:thought_index].strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""
return text.strip().strip("*").strip()
def _clean_action(text: str) -> str:
"""Clean action string by removing non-essential formatting characters.
def _safe_repair_json(self, tool_input: str) -> str:
UNABLE_TO_REPAIR_JSON_RESULTS = ['""', "{}"]
Args:
text: The action text to clean.
# Skip repair if the input starts and ends with square brackets
# Explanation: The JSON parser has issues handling inputs that are enclosed in square brackets ('[]').
# These are typically valid JSON arrays or strings that do not require repair. Attempting to repair such inputs
# might lead to unintended alterations, such as wrapping the entire input in additional layers or modifying
# the structure in a way that changes its meaning. By skipping the repair for inputs that start and end with
# square brackets, we preserve the integrity of these valid JSON structures and avoid unnecessary modifications.
if tool_input.startswith("[") and tool_input.endswith("]"):
return tool_input
Returns:
The cleaned action string.
"""
return text.strip().strip("*").strip()
# Before repair, handle common LLM issues:
# 1. Replace """ with " to avoid JSON parser errors
tool_input = tool_input.replace('"""', '"')
def _safe_repair_json(tool_input: str) -> str:
"""Safely repair JSON input.
result = repair_json(tool_input)
if result in UNABLE_TO_REPAIR_JSON_RESULTS:
return tool_input
Args:
tool_input: The tool input string to repair.
return str(result)
Returns:
The repaired JSON string or original if repair fails.
"""
# Skip repair if the input starts and ends with square brackets
# Explanation: The JSON parser has issues handling inputs that are enclosed in square brackets ('[]').
# These are typically valid JSON arrays or strings that do not require repair. Attempting to repair such inputs
# might lead to unintended alterations, such as wrapping the entire input in additional layers or modifying
# the structure in a way that changes its meaning. By skipping the repair for inputs that start and end with
# square brackets, we preserve the integrity of these valid JSON structures and avoid unnecessary modifications.
if tool_input.startswith("[") and tool_input.endswith("]"):
return tool_input
# Before repair, handle common LLM issues:
# 1. Replace """ with " to avoid JSON parser errors
tool_input = tool_input.replace('"""', '"')
result = repair_json(tool_input)
if result in UNABLE_TO_REPAIR_JSON_RESULTS:
return tool_input
return str(result)

View File

@@ -1,29 +1,41 @@
from typing import Any, Optional, Union
"""Tools handler for managing tool execution and caching."""
from ..tools.cache_tools.cache_tools import CacheTools
from ..tools.tool_calling import InstructorToolCalling, ToolCalling
from .cache.cache_handler import CacheHandler
from crewai.tools.cache_tools.cache_tools import CacheTools
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.agents.cache.cache_handler import CacheHandler
class ToolsHandler:
"""Callback handler for tool usage."""
"""Callback handler for tool usage.
last_used_tool: Optional[ToolCalling] = None
cache: Optional[CacheHandler]
Attributes:
last_used_tool: The most recently used tool calling instance.
cache: Optional cache handler for storing tool outputs.
"""
def __init__(self, cache: Optional[CacheHandler] = None):
"""Initialize the callback handler."""
self.cache = cache
self.last_used_tool = None
def __init__(self, cache: CacheHandler | None = None) -> None:
"""Initialize the callback handler.
Args:
cache: Optional cache handler for storing tool outputs.
"""
self.cache: CacheHandler | None = cache
self.last_used_tool: ToolCalling | InstructorToolCalling | None = None
def on_tool_use(
self,
calling: Union[ToolCalling, InstructorToolCalling],
calling: ToolCalling | InstructorToolCalling,
output: str,
should_cache: bool = True,
) -> Any:
"""Run when tool ends running."""
self.last_used_tool = calling # type: ignore # BUG?: Incompatible types in assignment (expression has type "Union[ToolCalling, InstructorToolCalling]", variable has type "ToolCalling")
) -> None:
"""Run when tool ends running.
Args:
calling: The tool calling instance.
output: The output from the tool execution.
should_cache: Whether to cache the tool output.
"""
self.last_used_tool = calling
if self.cache and should_cache and calling.tool_name != CacheTools().name:
self.cache.add(
tool=calling.tool_name,

View File

@@ -1,6 +1 @@
ALGORITHMS = ["RS256"]
#TODO: The AUTH0 constants should be removed after WorkOS migration is completed
AUTH0_DOMAIN = "crewai.us.auth0.com"
AUTH0_CLIENT_ID = "DEVC5Fw6NlRoSzmDCcOhVq85EfLBjKa8"
AUTH0_AUDIENCE = "https://crewai.us.auth0.com/api/v2/"

View File

@@ -7,24 +7,27 @@ from rich.console import Console
from pydantic import BaseModel, Field
from .utils import TokenManager, validate_jwt_token
from urllib.parse import quote
from crewai.cli.plus_api import PlusAPI
from .utils import validate_jwt_token
from crewai.cli.shared.token_manager import TokenManager
from crewai.cli.config import Settings
from crewai.cli.authentication.constants import (
AUTH0_AUDIENCE,
AUTH0_CLIENT_ID,
AUTH0_DOMAIN,
)
console = Console()
class Oauth2Settings(BaseModel):
provider: str = Field(description="OAuth2 provider used for authentication (e.g., workos, okta, auth0).")
client_id: str = Field(description="OAuth2 client ID issued by the provider, used during authentication requests.")
domain: str = Field(description="OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens.")
audience: Optional[str] = Field(description="OAuth2 audience value, typically used to identify the target API or resource.", default=None)
provider: str = Field(
description="OAuth2 provider used for authentication (e.g., workos, okta, auth0)."
)
client_id: str = Field(
description="OAuth2 client ID issued by the provider, used during authentication requests."
)
domain: str = Field(
description="OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens."
)
audience: Optional[str] = Field(
description="OAuth2 audience value, typically used to identify the target API or resource.",
default=None,
)
@classmethod
def from_settings(cls):
@@ -44,11 +47,15 @@ class ProviderFactory:
settings = settings or Oauth2Settings.from_settings()
import importlib
module = importlib.import_module(f"crewai.cli.authentication.providers.{settings.provider.lower()}")
module = importlib.import_module(
f"crewai.cli.authentication.providers.{settings.provider.lower()}"
)
provider = getattr(module, f"{settings.provider.capitalize()}Provider")
return provider(settings)
class AuthenticationCommand:
def __init__(self):
self.token_manager = TokenManager()
@@ -58,26 +65,12 @@ class AuthenticationCommand:
"""Sign up to CrewAI+"""
console.print("Signing in to CrewAI Enterprise...\n", style="bold blue")
# TODO: WORKOS - Next line and conditional are temporary until migration to WorkOS is complete.
user_provider = self._determine_user_provider()
if user_provider == "auth0":
settings = Oauth2Settings(
provider="auth0",
client_id=AUTH0_CLIENT_ID,
domain=AUTH0_DOMAIN,
audience=AUTH0_AUDIENCE
)
self.oauth2_provider = ProviderFactory.from_settings(settings)
# End of temporary code.
device_code_data = self._get_device_code()
self._display_auth_instructions(device_code_data)
return self._poll_for_token(device_code_data)
def _get_device_code(
self
) -> Dict[str, Any]:
def _get_device_code(self) -> Dict[str, Any]:
"""Get the device code to authenticate the user."""
device_code_payload = {
@@ -86,7 +79,9 @@ class AuthenticationCommand:
"audience": self.oauth2_provider.get_audience(),
}
response = requests.post(
url=self.oauth2_provider.get_authorize_url(), data=device_code_payload, timeout=20
url=self.oauth2_provider.get_authorize_url(),
data=device_code_payload,
timeout=20,
)
response.raise_for_status()
return response.json()
@@ -97,9 +92,7 @@ class AuthenticationCommand:
console.print("2. Enter the following code: ", device_code_data["user_code"])
webbrowser.open(device_code_data["verification_uri_complete"])
def _poll_for_token(
self, device_code_data: Dict[str, Any]
) -> None:
def _poll_for_token(self, device_code_data: Dict[str, Any]) -> None:
"""Polls the server for the token until it is received, or max attempts are reached."""
token_payload = {
@@ -112,7 +105,9 @@ class AuthenticationCommand:
attempts = 0
while True and attempts < 10:
response = requests.post(self.oauth2_provider.get_token_url(), data=token_payload, timeout=30)
response = requests.post(
self.oauth2_provider.get_token_url(), data=token_payload, timeout=30
)
token_data = response.json()
if response.status_code == 200:
@@ -192,30 +187,3 @@ class AuthenticationCommand:
"\nRun [bold]crewai login[/bold] to try logging in again.\n",
style="yellow",
)
# TODO: WORKOS - This method is temporary until migration to WorkOS is complete.
def _determine_user_provider(self) -> str:
"""Determine which provider to use for authentication."""
console.print(
"Enter your CrewAI Enterprise account email: ", style="bold blue", end=""
)
email = input()
email_encoded = quote(email)
# It's not correct to call this method directly, but it's temporary until migration is complete.
response = PlusAPI("")._make_request(
"GET", f"/crewai_plus/api/v1/me/provider?email={email_encoded}"
)
if response.status_code == 200:
if response.json().get("provider") == "auth0":
return "auth0"
else:
return "workos"
else:
console.print(
"Error: Failed to authenticate with crewai enterprise. Ensure that you are using the latest crewai version and please try again. If the problem persists, contact support@crewai.com.",
style="red",
)
raise SystemExit

View File

@@ -1,4 +1,4 @@
from .utils import TokenManager
from crewai.cli.shared.token_manager import TokenManager
class AuthError(Exception):

View File

@@ -1,12 +1,5 @@
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import jwt
from jwt import PyJWKClient
from cryptography.fernet import Fernet
def validate_jwt_token(
@@ -67,118 +60,3 @@ def validate_jwt_token(
raise Exception(f"JWKS or key processing error: {str(e)}")
except jwt.InvalidTokenError as e:
raise Exception(f"Invalid token: {str(e)}")
class TokenManager:
def __init__(self, file_path: str = "tokens.enc") -> None:
"""
Initialize the TokenManager class.
:param file_path: The file path to store the encrypted tokens. Default is "tokens.enc".
"""
self.file_path = file_path
self.key = self._get_or_create_key()
self.fernet = Fernet(self.key)
def _get_or_create_key(self) -> bytes:
"""
Get or create the encryption key.
:return: The encryption key.
"""
key_filename = "secret.key"
key = self.read_secure_file(key_filename)
if key is not None:
return key
new_key = Fernet.generate_key()
self.save_secure_file(key_filename, new_key)
return new_key
def save_tokens(self, access_token: str, expires_at: int) -> None:
"""
Save the access token and its expiration time.
:param access_token: The access token to save.
:param expires_at: The UNIX timestamp of the expiration time.
"""
expiration_time = datetime.fromtimestamp(expires_at)
data = {
"access_token": access_token,
"expiration": expiration_time.isoformat(),
}
encrypted_data = self.fernet.encrypt(json.dumps(data).encode())
self.save_secure_file(self.file_path, encrypted_data)
def get_token(self) -> Optional[str]:
"""
Get the access token if it is valid and not expired.
:return: The access token if valid and not expired, otherwise None.
"""
encrypted_data = self.read_secure_file(self.file_path)
decrypted_data = self.fernet.decrypt(encrypted_data) # type: ignore
data = json.loads(decrypted_data)
expiration = datetime.fromisoformat(data["expiration"])
if expiration <= datetime.now():
return None
return data["access_token"]
def get_secure_storage_path(self) -> Path:
"""
Get the secure storage path based on the operating system.
:return: The secure storage path.
"""
if sys.platform == "win32":
# Windows: Use %LOCALAPPDATA%
base_path = os.environ.get("LOCALAPPDATA")
elif sys.platform == "darwin":
# macOS: Use ~/Library/Application Support
base_path = os.path.expanduser("~/Library/Application Support")
else:
# Linux and other Unix-like: Use ~/.local/share
base_path = os.path.expanduser("~/.local/share")
app_name = "crewai/credentials"
storage_path = Path(base_path) / app_name
storage_path.mkdir(parents=True, exist_ok=True)
return storage_path
def save_secure_file(self, filename: str, content: bytes) -> None:
"""
Save the content to a secure file.
:param filename: The name of the file.
:param content: The content to save.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
with open(file_path, "wb") as f:
f.write(content)
# Set appropriate permissions (read/write for owner only)
os.chmod(file_path, 0o600)
def read_secure_file(self, filename: str) -> Optional[bytes]:
"""
Read the content of a secure file.
:param filename: The name of the file.
:return: The content of the file if it exists, otherwise None.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
if not file_path.exists():
return None
with open(file_path, "rb") as f:
return f.read()

View File

@@ -11,6 +11,7 @@ from crewai.cli.constants import (
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN,
)
from crewai.cli.shared.token_manager import TokenManager
DEFAULT_CONFIG_PATH = Path.home() / ".config" / "crewai" / "settings.json"
@@ -53,6 +54,7 @@ HIDDEN_SETTINGS_KEYS = [
"tool_repository_password",
]
class Settings(BaseModel):
enterprise_base_url: Optional[str] = Field(
default=DEFAULT_CLI_SETTINGS["enterprise_base_url"],
@@ -74,12 +76,12 @@ class Settings(BaseModel):
oauth2_provider: str = Field(
description="OAuth2 provider used for authentication (e.g., workos, okta, auth0).",
default=DEFAULT_CLI_SETTINGS["oauth2_provider"]
default=DEFAULT_CLI_SETTINGS["oauth2_provider"],
)
oauth2_audience: Optional[str] = Field(
description="OAuth2 audience value, typically used to identify the target API or resource.",
default=DEFAULT_CLI_SETTINGS["oauth2_audience"]
default=DEFAULT_CLI_SETTINGS["oauth2_audience"],
)
oauth2_client_id: str = Field(
@@ -89,7 +91,7 @@ class Settings(BaseModel):
oauth2_domain: str = Field(
description="OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens.",
default=DEFAULT_CLI_SETTINGS["oauth2_domain"]
default=DEFAULT_CLI_SETTINGS["oauth2_domain"],
)
def __init__(self, config_path: Path = DEFAULT_CONFIG_PATH, **data):
@@ -116,6 +118,7 @@ class Settings(BaseModel):
"""Reset all settings to default values"""
self._reset_user_settings()
self._reset_cli_settings()
self._clear_auth_tokens()
self.dump()
def dump(self) -> None:
@@ -139,3 +142,7 @@ class Settings(BaseModel):
"""Reset all CLI settings to default values"""
for key in CLI_SETTINGS_KEYS:
setattr(self, key, DEFAULT_CLI_SETTINGS.get(key))
def _clear_auth_tokens(self) -> None:
"""Clear all authentication tokens"""
TokenManager().clear_tokens()

View File

@@ -117,17 +117,19 @@ class PlusAPI:
def get_organizations(self) -> requests.Response:
return self._make_request("GET", self.ORGANIZATIONS_RESOURCE)
def send_trace_batch(self, payload) -> requests.Response:
return self._make_request("POST", self.TRACING_RESOURCE, json=payload)
def initialize_trace_batch(self, payload) -> requests.Response:
return self._make_request(
"POST", f"{self.TRACING_RESOURCE}/batches", json=payload
"POST",
f"{self.TRACING_RESOURCE}/batches",
json=payload,
timeout=30,
)
def initialize_ephemeral_trace_batch(self, payload) -> requests.Response:
return self._make_request(
"POST", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches", json=payload
"POST",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches",
json=payload,
)
def send_trace_events(self, trace_batch_id: str, payload) -> requests.Response:
@@ -135,6 +137,7 @@ class PlusAPI:
"POST",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
timeout=30,
)
def send_ephemeral_trace_events(
@@ -144,6 +147,7 @@ class PlusAPI:
"POST",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
timeout=30,
)
def finalize_trace_batch(self, trace_batch_id: str, payload) -> requests.Response:
@@ -151,6 +155,7 @@ class PlusAPI:
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
timeout=30,
)
def finalize_ephemeral_trace_batch(
@@ -160,4 +165,5 @@ class PlusAPI:
"PATCH",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
timeout=30,
)

View File

@@ -10,8 +10,9 @@ console = Console()
class SettingsCommand(BaseCommand):
"""A class to handle CLI configuration commands."""
def __init__(self, settings_kwargs: dict[str, Any] = {}):
def __init__(self, settings_kwargs: dict[str, Any] | None = None):
super().__init__()
settings_kwargs = settings_kwargs or {}
self.settings = Settings(**settings_kwargs)
def list(self) -> None:

View File

@@ -0,0 +1,141 @@
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
from cryptography.fernet import Fernet
class TokenManager:
def __init__(self, file_path: str = "tokens.enc") -> None:
"""
Initialize the TokenManager class.
:param file_path: The file path to store the encrypted tokens. Default is "tokens.enc".
"""
self.file_path = file_path
self.key = self._get_or_create_key()
self.fernet = Fernet(self.key)
def _get_or_create_key(self) -> bytes:
"""
Get or create the encryption key.
:return: The encryption key.
"""
key_filename = "secret.key"
key = self.read_secure_file(key_filename)
if key is not None:
return key
new_key = Fernet.generate_key()
self.save_secure_file(key_filename, new_key)
return new_key
def save_tokens(self, access_token: str, expires_at: int) -> None:
"""
Save the access token and its expiration time.
:param access_token: The access token to save.
:param expires_at: The UNIX timestamp of the expiration time.
"""
expiration_time = datetime.fromtimestamp(expires_at)
data = {
"access_token": access_token,
"expiration": expiration_time.isoformat(),
}
encrypted_data = self.fernet.encrypt(json.dumps(data).encode())
self.save_secure_file(self.file_path, encrypted_data)
def get_token(self) -> Optional[str]:
"""
Get the access token if it is valid and not expired.
:return: The access token if valid and not expired, otherwise None.
"""
encrypted_data = self.read_secure_file(self.file_path)
if encrypted_data is None:
return None
decrypted_data = self.fernet.decrypt(encrypted_data) # type: ignore
data = json.loads(decrypted_data)
expiration = datetime.fromisoformat(data["expiration"])
if expiration <= datetime.now():
return None
return data["access_token"]
def clear_tokens(self) -> None:
"""
Clear the tokens.
"""
self.delete_secure_file(self.file_path)
def get_secure_storage_path(self) -> Path:
"""
Get the secure storage path based on the operating system.
:return: The secure storage path.
"""
if sys.platform == "win32":
# Windows: Use %LOCALAPPDATA%
base_path = os.environ.get("LOCALAPPDATA")
elif sys.platform == "darwin":
# macOS: Use ~/Library/Application Support
base_path = os.path.expanduser("~/Library/Application Support")
else:
# Linux and other Unix-like: Use ~/.local/share
base_path = os.path.expanduser("~/.local/share")
app_name = "crewai/credentials"
storage_path = Path(base_path) / app_name
storage_path.mkdir(parents=True, exist_ok=True)
return storage_path
def save_secure_file(self, filename: str, content: bytes) -> None:
"""
Save the content to a secure file.
:param filename: The name of the file.
:param content: The content to save.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
with open(file_path, "wb") as f:
f.write(content)
# Set appropriate permissions (read/write for owner only)
os.chmod(file_path, 0o600)
def read_secure_file(self, filename: str) -> Optional[bytes]:
"""
Read the content of a secure file.
:param filename: The name of the file.
:return: The content of the file if it exists, otherwise None.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
if not file_path.exists():
return None
with open(file_path, "rb") as f:
return f.read()
def delete_secure_file(self, filename: str) -> None:
"""
Delete the secure file.
:param filename: The name of the file.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
if file_path.exists():
file_path.unlink(missing_ok=True)

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.165.1,<1.0.0"
"crewai[tools]>=0.175.0,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.165.1,<1.0.0",
"crewai[tools]>=0.175.0,<1.0.0",
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.165.1"
"crewai[tools]>=0.175.0"
]
[tool.crewai]

View File

@@ -59,7 +59,7 @@ from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.events.crew_events import (
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
@@ -70,14 +70,14 @@ from crewai.utilities.events.crew_events import (
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.event_listener import EventListener
from crewai.utilities.events.listeners.tracing.trace_listener import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.events.listeners.tracing.utils import (
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled,
)
from crewai.utilities.formatter import (
@@ -559,9 +559,10 @@ class Crew(FlowTrackable, BaseModel):
CrewTrainingHandler(filename).initialize_file()
def train(
self, n_iterations: int, filename: str, inputs: Optional[Dict[str, Any]] = {}
self, n_iterations: int, filename: str, inputs: Optional[Dict[str, Any]] = None
) -> None:
"""Trains the crew for a given number of iterations."""
inputs = inputs or {}
try:
crewai_event_bus.emit(
self,
@@ -702,8 +703,11 @@ class Crew(FlowTrackable, BaseModel):
self._task_output_handler.reset()
return results
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> CrewOutput:
async def kickoff_async(
self, inputs: Optional[Dict[str, Any]] = None
) -> CrewOutput:
"""Asynchronous kickoff method to start the crew execution."""
inputs = inputs or {}
return await asyncio.to_thread(self.kickoff, inputs)
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[CrewOutput]:

View File

@@ -0,0 +1,56 @@
"""CrewAI events system for monitoring and extending agent behavior.
This module provides the event infrastructure that allows users to:
- Monitor agent, task, and crew execution
- Track memory operations and performance
- Build custom logging and analytics
- Extend CrewAI with custom event handlers
"""
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent,
MemorySaveStartedEvent,
MemoryQueryStartedEvent,
MemoryRetrievalCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryFailedEvent,
)
from crewai.events.types.knowledge_events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
)
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
)
from crewai.events.types.llm_events import (
LLMStreamChunkEvent,
)
__all__ = [
"BaseEventListener",
"crewai_event_bus",
"MemoryQueryCompletedEvent",
"MemorySaveCompletedEvent",
"MemorySaveStartedEvent",
"MemoryQueryStartedEvent",
"MemoryRetrievalCompletedEvent",
"MemorySaveFailedEvent",
"MemoryQueryFailedEvent",
"KnowledgeRetrievalStartedEvent",
"KnowledgeRetrievalCompletedEvent",
"CrewKickoffStartedEvent",
"CrewKickoffCompletedEvent",
"AgentExecutionCompletedEvent",
"LLMStreamChunkEvent",
]

View File

@@ -0,0 +1,15 @@
from abc import ABC, abstractmethod
from crewai.events.event_bus import CrewAIEventsBus, crewai_event_bus
class BaseEventListener(ABC):
verbose: bool = False
def __init__(self):
super().__init__()
self.setup_listeners(crewai_event_bus)
@abstractmethod
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus):
pass

View File

@@ -11,7 +11,9 @@ class BaseEvent(BaseModel):
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"
source_type: Optional[str] = (
None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"
)
fingerprint_metadata: Optional[Dict[str, Any]] = None # Any relevant metadata
def to_json(self, exclude: set[str] | None = None):
@@ -25,3 +27,20 @@ class BaseEvent(BaseModel):
dict: A JSON-serializable dictionary.
"""
return to_serializable(self, exclude=exclude)
def _set_task_params(self, data: Dict[str, Any]):
if "from_task" in data and (task := data["from_task"]):
self.task_id = task.id
self.task_name = task.name or task.description
self.from_task = None
def _set_agent_params(self, data: Dict[str, Any]):
task = data.get("from_task", None)
agent = task.agent if task else data.get("from_agent", None)
if not agent:
return
self.agent_id = agent.id
self.agent_role = agent.role
self.from_agent = None

View File

@@ -0,0 +1,117 @@
from __future__ import annotations
import threading
from contextlib import contextmanager
from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from blinker import Signal
from crewai.events.base_events import BaseEvent
from crewai.events.event_types import EventTypes
EventT = TypeVar("EventT", bound=BaseEvent)
class CrewAIEventsBus:
"""
A singleton event bus that uses blinker signals for event handling.
Allows both internal (Flow/Crew) and external event handling.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None: # prevent race condition
cls._instance = super(CrewAIEventsBus, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self) -> None:
"""Initialize the event bus internal state"""
self._signal = Signal("crewai_event_bus")
self._handlers: Dict[Type[BaseEvent], List[Callable]] = {}
def on(
self, event_type: Type[EventT]
) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]:
"""
Decorator to register an event handler for a specific event type.
Usage:
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(
source: Any, event: AgentExecutionCompletedEvent
):
print(f"👍 Agent '{event.agent}' completed task")
print(f" Output: {event.output}")
"""
def decorator(
handler: Callable[[Any, EventT], None],
) -> Callable[[Any, EventT], None]:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(
cast(Callable[[Any, EventT], None], handler)
)
return handler
return decorator
def emit(self, source: Any, event: BaseEvent) -> None:
"""
Emit an event to all registered handlers
Args:
source: The object emitting the event
event: The event instance to emit
"""
for event_type, handlers in self._handlers.items():
if isinstance(event, event_type):
for handler in handlers:
try:
handler(source, event)
except Exception as e:
print(
f"[EventBus Error] Handler '{handler.__name__}' failed for event '{event_type.__name__}': {e}"
)
self._signal.send(source, event=event)
def register_handler(
self, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None]
) -> None:
"""Register an event handler for a specific event type"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(
cast(Callable[[Any, EventTypes], None], handler)
)
@contextmanager
def scoped_handlers(self):
"""
Context manager for temporary event handling scope.
Useful for testing or temporary event handling.
Usage:
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStarted)
def temp_handler(source, event):
print("Temporary handler")
# Do stuff...
# Handlers are cleared after the context
"""
previous_handlers = self._handlers.copy()
self._handlers.clear()
try:
yield
finally:
self._handlers = previous_handlers
# Global instance
crewai_event_bus = CrewAIEventsBus()

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
from io import StringIO
from typing import Any, Dict
@@ -7,8 +9,8 @@ from crewai.task import Task
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.constants import EMITTER_COLOR
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.knowledge_events import (
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
@@ -16,28 +18,30 @@ from crewai.utilities.events.knowledge_events import (
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
)
from crewai.utilities.events.llm_events import (
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from crewai.utilities.events.llm_guardrail_events import (
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
)
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.events.utils.console_formatter import ConsoleFormatter
from .agent_events import (
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
AgentLogsStartedEvent,
AgentLogsExecutionEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from .crew_events import (
from crewai.events.types.logging_events import (
AgentLogsStartedEvent,
AgentLogsExecutionEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
@@ -49,7 +53,7 @@ from .crew_events import (
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from .flow_events import (
from .types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowStartedEvent,
@@ -57,13 +61,13 @@ from .flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from .task_events import TaskCompletedEvent, TaskFailedEvent, TaskStartedEvent
from .tool_usage_events import (
from .types.task_events import TaskCompletedEvent, TaskFailedEvent, TaskStartedEvent
from .types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from .reasoning_events import (
from .types.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
@@ -162,7 +166,7 @@ class EventListener(BaseEventListener):
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span
# Pass both task ID and task name (if set)
task_name = source.name if hasattr(source, 'name') and source.name else None
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.create_task_branch(
self.formatter.current_crew_tree, source.id, task_name
)
@@ -176,13 +180,13 @@ class EventListener(BaseEventListener):
self.execution_spans[source] = None
# Pass task name if it exists
task_name = source.name if hasattr(source, 'name') and source.name else None
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.update_task_status(
self.formatter.current_crew_tree,
source.id,
source.agent.role,
"completed",
task_name
task_name,
)
@crewai_event_bus.on(TaskFailedEvent)
@@ -194,13 +198,13 @@ class EventListener(BaseEventListener):
self.execution_spans[source] = None
# Pass task name if it exists
task_name = source.name if hasattr(source, 'name') and source.name else None
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.update_task_status(
self.formatter.current_crew_tree,
source.id,
source.agent.role,
"failed",
task_name
task_name,
)
# ----------- AGENT EVENTS -----------

View File

@@ -1,12 +1,12 @@
from typing import Union
from .agent_events import (
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
)
from .crew_events import (
from .types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
@@ -17,39 +17,39 @@ from .crew_events import (
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from .flow_events import (
from .types.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from .llm_events import (
from .types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from .llm_guardrail_events import (
from .types.llm_guardrail_events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from .task_events import (
from .types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from .tool_usage_events import (
from .types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from .reasoning_events import (
from .types.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
from .knowledge_events import (
from .types.knowledge_events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeQueryStartedEvent,
@@ -58,7 +58,7 @@ from .knowledge_events import (
KnowledgeSearchQueryFailedEvent,
)
from .memory_events import (
from .types.memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,

View File

@@ -0,0 +1,5 @@
"""Event listener implementations for CrewAI.
This module contains various event listener implementations
for handling memory, tracing, and other event-driven functionality.
"""

View File

@@ -1,5 +1,5 @@
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.memory_events import (
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.types.memory_events import (
MemoryRetrievalCompletedEvent,
MemoryRetrievalStartedEvent,
MemoryQueryFailedEvent,
@@ -9,8 +9,8 @@ from crewai.utilities.events.memory_events import (
MemorySaveFailedEvent,
)
class MemoryListener(BaseEventListener):
class MemoryListener(BaseEventListener):
def __init__(self, formatter):
super().__init__()
self.formatter = formatter
@@ -19,9 +19,7 @@ class MemoryListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
def on_memory_retrieval_started(
source, event: MemoryRetrievalStartedEvent
):
def on_memory_retrieval_started(source, event: MemoryRetrievalStartedEvent):
if self.memory_retrieval_in_progress:
return
@@ -33,9 +31,7 @@ class MemoryListener(BaseEventListener):
)
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
def on_memory_retrieval_completed(
source, event: MemoryRetrievalCompletedEvent
):
def on_memory_retrieval_completed(source, event: MemoryRetrievalCompletedEvent):
if not self.memory_retrieval_in_progress:
return
@@ -44,7 +40,7 @@ class MemoryListener(BaseEventListener):
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.memory_content,
event.retrieval_time_ms
event.retrieval_time_ms,
)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
@@ -107,4 +103,4 @@ class MemoryListener(BaseEventListener):
event.error,
event.source_type,
self.formatter.current_crew_tree,
)
)

View File

@@ -11,7 +11,7 @@ from crewai.cli.plus_api import PlusAPI
from rich.console import Console
from rich.panel import Panel
from crewai.utilities.events.listeners.tracing.types import TraceEvent
from crewai.events.listeners.tracing.types import TraceEvent
from logging import getLogger
logger = getLogger(__name__)
@@ -41,18 +41,21 @@ class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering"""
is_current_batch_ephemeral: bool = False
trace_batch_id: Optional[str] = None
current_batch: Optional[TraceBatch] = None
event_buffer: List[TraceEvent] = []
execution_start_times: Dict[str, datetime] = {}
batch_owner_type: Optional[str] = None
batch_owner_id: Optional[str] = None
def __init__(self):
try:
self.plus_api = PlusAPI(api_key=get_auth_token())
self.plus_api = PlusAPI(
api_key=get_auth_token(),
)
except AuthError:
self.plus_api = PlusAPI(api_key="")
self.trace_batch_id: Optional[str] = None # Backend ID
self.current_batch: Optional[TraceBatch] = None
self.event_buffer: List[TraceEvent] = []
self.execution_start_times: Dict[str, datetime] = {}
def initialize_batch(
self,
user_context: Dict[str, str],
@@ -113,7 +116,13 @@ class TraceBatchManager:
else self.plus_api.initialize_trace_batch(payload)
)
if response.status_code == 201 or response.status_code == 200:
if response is None:
logger.warning(
"Trace batch initialization failed gracefully. Continuing without tracing."
)
return
if response.status_code in [201, 200]:
response_data = response.json()
self.trace_batch_id = (
response_data["trace_id"]
@@ -128,21 +137,23 @@ class TraceBatchManager:
)
console.print(panel)
else:
logger.error(
f"❌ Failed to initialize trace batch: {response.status_code} - {response.text}"
logger.warning(
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
)
except Exception as e:
logger.error(f"❌ Error initializing trace batch: {str(e)}")
logger.warning(
f"Error initializing trace batch: {str(e)}. Continuing without tracing."
)
def add_event(self, trace_event: TraceEvent):
"""Add event to buffer"""
self.event_buffer.append(trace_event)
def _send_events_to_backend(self):
"""Send buffered events to backend"""
def _send_events_to_backend(self) -> int:
"""Send buffered events to backend with graceful failure handling"""
if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
return
return 500
try:
payload = {
@@ -154,38 +165,48 @@ class TraceBatchManager:
},
}
if not self.trace_batch_id:
raise Exception("❌ Trace batch ID not found")
response = (
self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload)
if self.is_current_batch_ephemeral
else self.plus_api.send_trace_events(self.trace_batch_id, payload)
)
if response.status_code == 200 or response.status_code == 201:
if response is None:
logger.warning("Failed to send trace events. Events will be lost.")
return 500
if response.status_code in [200, 201]:
self.event_buffer.clear()
return 200
else:
logger.error(
f"Failed to send events: {response.status_code} - {response.text}"
logger.warning(
f"Failed to send events: {response.status_code}. Events will be lost."
)
return 500
except Exception as e:
logger.error(f"❌ Error sending events to backend: {str(e)}")
logger.warning(
f"Error sending events to backend: {str(e)}. Events will be lost."
)
return 500
def finalize_batch(self) -> Optional[TraceBatch]:
"""Finalize batch and return it for sending"""
if not self.current_batch:
return None
self.current_batch.events = self.event_buffer.copy()
if self.event_buffer:
self._send_events_to_backend()
events_sent_to_backend_status = self._send_events_to_backend()
if events_sent_to_backend_status == 500:
return None
self._finalize_backend_batch()
self.current_batch.events = self.event_buffer.copy()
finalized_batch = self.current_batch
self.batch_owner_type = None
self.batch_owner_id = None
self.current_batch = None
self.event_buffer.clear()
self.trace_batch_id = None

View File

@@ -3,8 +3,8 @@ import uuid
from typing import Dict, Any, Optional
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.agent_events import (
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionStartedEvent,
@@ -12,34 +12,34 @@ from crewai.utilities.events.agent_events import (
LiteAgentExecutionErrorEvent,
AgentExecutionErrorEvent,
)
from crewai.utilities.events.listeners.tracing.types import TraceEvent
from crewai.utilities.events.reasoning_events import (
from crewai.events.listeners.tracing.types import TraceEvent
from crewai.events.types.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
from crewai.utilities.events.crew_events import (
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
)
from crewai.utilities.events.task_events import (
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.utilities.events.tool_usage_events import (
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.llm_events import (
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from crewai.utilities.events.flow_events import (
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowStartedEvent,
FlowFinishedEvent,
@@ -48,7 +48,7 @@ from crewai.utilities.events.flow_events import (
MethodExecutionFailedEvent,
FlowPlotEvent,
)
from crewai.utilities.events.llm_guardrail_events import (
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
)
@@ -57,7 +57,7 @@ from crewai.utilities.serialization import to_serializable
from .trace_batch_manager import TraceBatchManager
from crewai.utilities.events.memory_events import (
from crewai.events.types.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
@@ -75,10 +75,18 @@ class TraceCollectionListener(BaseEventListener):
Trace collection listener that orchestrates trace collection
"""
complex_events = ["task_started", "llm_call_started", "llm_call_completed"]
complex_events = [
"task_started",
"task_completed",
"llm_call_started",
"llm_call_completed",
"agent_execution_started",
"agent_execution_completed",
]
_instance = None
_initialized = False
_listeners_setup = False
def __new__(cls, batch_manager=None):
if cls._instance is None:
@@ -116,10 +124,15 @@ class TraceCollectionListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
"""Setup event listeners - delegates to specific handlers"""
if self._listeners_setup:
return
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)
self._register_action_event_handlers(crewai_event_bus)
self._listeners_setup = True
def _register_flow_event_handlers(self, event_bus):
"""Register handlers for flow events"""
@@ -148,7 +161,8 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event):
self._handle_trace_event("flow_finished", source, event)
self.batch_manager.finalize_batch()
if self.batch_manager.batch_owner_type == "flow":
self.batch_manager.finalize_batch()
@event_bus.on(FlowPlotEvent)
def on_flow_plot(source, event):
@@ -166,7 +180,8 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
self._handle_trace_event("crew_kickoff_completed", source, event)
self.batch_manager.finalize_batch()
if self.batch_manager.batch_owner_type == "crew":
self.batch_manager.finalize_batch()
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event):
@@ -218,7 +233,7 @@ class TraceCollectionListener(BaseEventListener):
self._handle_trace_event("llm_guardrail_completed", source, event)
def _register_action_event_handlers(self, event_bus):
"""Register handlers for action events (LLM calls, tool usage, memory)"""
"""Register handlers for action events (LLM calls, tool usage)"""
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event):
@@ -289,6 +304,9 @@ class TraceCollectionListener(BaseEventListener):
"crewai_version": get_crewai_version(),
}
self.batch_manager.batch_owner_type = "crew"
self.batch_manager.batch_owner_id = getattr(source, "id", str(uuid.uuid4()))
self._initialize_batch(user_context, execution_metadata)
def _initialize_flow_batch(self, source: Any, event: Any):
@@ -301,6 +319,9 @@ class TraceCollectionListener(BaseEventListener):
"execution_type": "flow",
}
self.batch_manager.batch_owner_type = "flow"
self.batch_manager.batch_owner_id = getattr(source, "id", str(uuid.uuid4()))
self._initialize_batch(user_context, execution_metadata)
def _initialize_batch(
@@ -358,12 +379,44 @@ class TraceCollectionListener(BaseEventListener):
return {
"task_description": event.task.description,
"expected_output": event.task.expected_output,
"task_name": event.task.name,
"task_name": event.task.name or event.task.description,
"context": event.context,
"agent": source.agent.role,
"agent_role": source.agent.role,
"task_id": str(event.task.id),
}
elif event_type == "task_completed":
return {
"task_description": event.task.description if event.task else None,
"task_name": event.task.name or event.task.description
if event.task
else None,
"task_id": str(event.task.id) if event.task else None,
"output_raw": event.output.raw if event.output else None,
"output_format": str(event.output.output_format)
if event.output
else None,
"agent_role": event.output.agent if event.output else None,
}
elif event_type == "agent_execution_started":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
elif event_type == "agent_execution_completed":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
elif event_type == "llm_call_started":
return self._safe_serialize_to_dict(event)
event_data = self._safe_serialize_to_dict(event)
event_data["task_name"] = (
event.task_name or event.task_description
if hasattr(event, "task_name") and event.task_name
else None
)
return event_data
elif event_type == "llm_call_completed":
return self._safe_serialize_to_dict(event)
else:

View File

@@ -0,0 +1,5 @@
"""Event type definitions for CrewAI.
This module contains all event types used throughout the CrewAI system
for monitoring and extending agent, crew, task, and tool execution.
"""

View File

@@ -1,13 +1,15 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
"""Agent-related events moved to break circular dependencies."""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Sequence, Union
from pydantic import model_validator
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from .base_events import BaseEvent
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.base_events import BaseEvent
class AgentExecutionStartedEvent(BaseEvent):
@@ -21,9 +23,9 @@ class AgentExecutionStartedEvent(BaseEvent):
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
@model_validator(mode="after")
def set_fingerprint_data(self):
"""Set fingerprint data from the agent if available."""
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
@@ -32,6 +34,7 @@ class AgentExecutionStartedEvent(BaseEvent):
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
return self
class AgentExecutionCompletedEvent(BaseEvent):
@@ -42,9 +45,11 @@ class AgentExecutionCompletedEvent(BaseEvent):
output: str
type: str = "agent_execution_completed"
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
model_config = {"arbitrary_types_allowed": True}
@model_validator(mode="after")
def set_fingerprint_data(self):
"""Set fingerprint data from the agent if available."""
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
@@ -53,6 +58,7 @@ class AgentExecutionCompletedEvent(BaseEvent):
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
return self
class AgentExecutionErrorEvent(BaseEvent):
@@ -63,9 +69,11 @@ class AgentExecutionErrorEvent(BaseEvent):
error: str
type: str = "agent_execution_error"
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
model_config = {"arbitrary_types_allowed": True}
@model_validator(mode="after")
def set_fingerprint_data(self):
"""Set fingerprint data from the agent if available."""
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
@@ -74,6 +82,7 @@ class AgentExecutionErrorEvent(BaseEvent):
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
return self
# New event classes for LiteAgent
@@ -104,26 +113,6 @@ class LiteAgentExecutionErrorEvent(BaseEvent):
type: str = "lite_agent_execution_error"
# New logging events
class AgentLogsStartedEvent(BaseEvent):
"""Event emitted when agent logs should be shown at start"""
agent_role: str
task_description: Optional[str] = None
verbose: bool = False
type: str = "agent_logs_started"
class AgentLogsExecutionEvent(BaseEvent):
"""Event emitted when agent logs should be shown during execution"""
agent_role: str
formatted_answer: Any
verbose: bool = False
type: str = "agent_logs_execution"
model_config = {"arbitrary_types_allowed": True}
# Agent Eval events
class AgentEvaluationStartedEvent(BaseEvent):
agent_id: str
@@ -132,6 +121,7 @@ class AgentEvaluationStartedEvent(BaseEvent):
iteration: int
type: str = "agent_evaluation_started"
class AgentEvaluationCompletedEvent(BaseEvent):
agent_id: str
agent_role: str
@@ -141,6 +131,7 @@ class AgentEvaluationCompletedEvent(BaseEvent):
score: Any
type: str = "agent_evaluation_completed"
class AgentEvaluationFailedEvent(BaseEvent):
agent_id: str
agent_role: str

View File

@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from crewai.utilities.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent
if TYPE_CHECKING:
from crewai.crew import Crew

View File

@@ -2,7 +2,7 @@ from typing import Any, Dict, Optional, Union
from pydantic import BaseModel, ConfigDict
from .base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class FlowEvent(BaseEvent):

View File

@@ -1,10 +1,6 @@
from typing import TYPE_CHECKING, Any
from crewai.events.base_events import BaseEvent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.utilities.events.base_events import BaseEvent
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
class KnowledgeRetrievalStartedEvent(BaseEvent):
@@ -20,7 +16,7 @@ class KnowledgeRetrievalCompletedEvent(BaseEvent):
query: str
type: str = "knowledge_search_query_completed"
agent: BaseAgent
retrieved_knowledge: Any
retrieved_knowledge: str
class KnowledgeQueryStartedEvent(BaseEvent):

View File

@@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
from crewai.utilities.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class LLMEventBase(BaseEvent):
@@ -13,26 +13,14 @@ class LLMEventBase(BaseEvent):
agent_id: Optional[str] = None
agent_role: Optional[str] = None
from_task: Optional[Any] = None
from_agent: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
self._set_agent_params(data)
self._set_task_params(data)
def _set_agent_params(self, data: Dict[str, Any]):
task = data.get("from_task", None)
agent = task.agent if task else data.get("from_agent", None)
if not agent:
return
self.agent_id = agent.id
self.agent_role = agent.role
def _set_task_params(self, data: Dict[str, Any]):
if "from_task" in data and (task := data["from_task"]):
self.task_id = task.id
self.task_name = task.name
class LLMCallType(Enum):
"""Type of LLM call being made"""

View File

@@ -1,7 +1,7 @@
from inspect import getsource
from typing import Any, Callable, Optional, Union
from crewai.utilities.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class LLMGuardrailStartedEvent(BaseEvent):

View File

@@ -0,0 +1,25 @@
"""Agent logging events that don't reference BaseAgent to avoid circular imports."""
from typing import Any, Optional
from crewai.events.base_events import BaseEvent
class AgentLogsStartedEvent(BaseEvent):
"""Event emitted when agent logs should be shown at start"""
agent_role: str
task_description: Optional[str] = None
verbose: bool = False
type: str = "agent_logs_started"
class AgentLogsExecutionEvent(BaseEvent):
"""Event emitted when agent logs should be shown during execution"""
agent_role: str
formatted_answer: Any
verbose: bool = False
type: str = "agent_logs_execution"
model_config = {"arbitrary_types_allowed": True}

View File

@@ -1,9 +1,26 @@
from typing import Any, Dict, Optional
from crewai.utilities.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class MemoryQueryStartedEvent(BaseEvent):
class MemoryBaseEvent(BaseEvent):
"""Base event for memory operations"""
type: str
task_id: Optional[str] = None
task_name: Optional[str] = None
from_task: Optional[Any] = None
from_agent: Optional[Any] = None
agent_role: Optional[str] = None
agent_id: Optional[str] = None
def __init__(self, **data):
super().__init__(**data)
self._set_agent_params(data)
self._set_task_params(data)
class MemoryQueryStartedEvent(MemoryBaseEvent):
"""Event emitted when a memory query is started"""
type: str = "memory_query_started"
@@ -12,7 +29,7 @@ class MemoryQueryStartedEvent(BaseEvent):
score_threshold: Optional[float] = None
class MemoryQueryCompletedEvent(BaseEvent):
class MemoryQueryCompletedEvent(MemoryBaseEvent):
"""Event emitted when a memory query is completed successfully"""
type: str = "memory_query_completed"
@@ -23,7 +40,7 @@ class MemoryQueryCompletedEvent(BaseEvent):
query_time_ms: float
class MemoryQueryFailedEvent(BaseEvent):
class MemoryQueryFailedEvent(MemoryBaseEvent):
"""Event emitted when a memory query fails"""
type: str = "memory_query_failed"
@@ -33,7 +50,7 @@ class MemoryQueryFailedEvent(BaseEvent):
error: str
class MemorySaveStartedEvent(BaseEvent):
class MemorySaveStartedEvent(MemoryBaseEvent):
"""Event emitted when a memory save operation is started"""
type: str = "memory_save_started"
@@ -42,7 +59,7 @@ class MemorySaveStartedEvent(BaseEvent):
agent_role: Optional[str] = None
class MemorySaveCompletedEvent(BaseEvent):
class MemorySaveCompletedEvent(MemoryBaseEvent):
"""Event emitted when a memory save operation is completed successfully"""
type: str = "memory_save_completed"
@@ -52,7 +69,7 @@ class MemorySaveCompletedEvent(BaseEvent):
save_time_ms: float
class MemorySaveFailedEvent(BaseEvent):
class MemorySaveFailedEvent(MemoryBaseEvent):
"""Event emitted when a memory save operation fails"""
type: str = "memory_save_failed"
@@ -62,14 +79,14 @@ class MemorySaveFailedEvent(BaseEvent):
error: str
class MemoryRetrievalStartedEvent(BaseEvent):
class MemoryRetrievalStartedEvent(MemoryBaseEvent):
"""Event emitted when memory retrieval for a task prompt starts"""
type: str = "memory_retrieval_started"
task_id: Optional[str] = None
class MemoryRetrievalCompletedEvent(BaseEvent):
class MemoryRetrievalCompletedEvent(MemoryBaseEvent):
"""Event emitted when memory retrieval for a task prompt completes successfully"""
type: str = "memory_retrieval_completed"

View File

@@ -0,0 +1,47 @@
from crewai.events.base_events import BaseEvent
from typing import Any, Optional
class ReasoningEvent(BaseEvent):
"""Base event for reasoning events."""
type: str
attempt: int = 1
agent_role: str
task_id: str
task_name: Optional[str] = None
from_task: Optional[Any] = None
agent_id: Optional[str] = None
from_agent: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
self._set_task_params(data)
self._set_agent_params(data)
class AgentReasoningStartedEvent(ReasoningEvent):
"""Event emitted when an agent starts reasoning about a task."""
type: str = "agent_reasoning_started"
agent_role: str
task_id: str
class AgentReasoningCompletedEvent(ReasoningEvent):
"""Event emitted when an agent finishes its reasoning process."""
type: str = "agent_reasoning_completed"
agent_role: str
task_id: str
plan: str
ready: bool
class AgentReasoningFailedEvent(ReasoningEvent):
"""Event emitted when the reasoning process fails."""
type: str = "agent_reasoning_failed"
agent_role: str
task_id: str
error: str

View File

@@ -1,7 +1,7 @@
from typing import Any, Optional
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class TaskStartedEvent(BaseEvent):

View File

@@ -1,7 +1,7 @@
from datetime import datetime
from typing import Any, Callable, Dict, Optional
from .base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class ToolUsageEvent(BaseEvent):
@@ -9,17 +9,24 @@ class ToolUsageEvent(BaseEvent):
agent_key: Optional[str] = None
agent_role: Optional[str] = None
agent_id: Optional[str] = None
tool_name: str
tool_args: Dict[str, Any] | str
tool_class: Optional[str] = None
run_attempts: int | None = None
delegations: int | None = None
agent: Optional[Any] = None
task_name: Optional[str] = None
task_id: Optional[str] = None
from_task: Optional[Any] = None
from_agent: Optional[Any] = None
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
self._set_agent_params(data)
self._set_task_params(data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str

View File

@@ -227,7 +227,7 @@ class ConsoleFormatter:
return None
task_content = Text()
# Display task name if available, otherwise just the ID
if task_name:
task_content.append("📋 Task: ", style="yellow bold")
@@ -235,7 +235,7 @@ class ConsoleFormatter:
task_content.append(f" (ID: {task_id})", style="yellow dim")
else:
task_content.append(f"📋 Task: {task_id}", style="yellow bold")
task_content.append("\nStatus: ", style="white")
task_content.append("Executing Task...", style="yellow dim")

View File

@@ -1,28 +1,42 @@
import threading
from typing import Any
from typing import Any, Optional
from crewai.experimental.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
from crewai.experimental.evaluation.base_evaluator import (
AgentEvaluationResult,
AggregationStrategy,
)
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from crewai.events.types.agent_events import (
AgentEvaluationStartedEvent,
AgentEvaluationCompletedEvent,
AgentEvaluationFailedEvent,
)
from crewai.experimental.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.events.task_events import TaskCompletedEvent
from crewai.utilities.events.agent_events import LiteAgentExecutionCompletedEvent
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, EvaluationScore, MetricCategory
from crewai.events.event_bus import crewai_event_bus
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.events.types.task_events import TaskCompletedEvent
from crewai.events.types.agent_events import LiteAgentExecutionCompletedEvent
from crewai.experimental.evaluation.base_evaluator import (
AgentAggregatedEvaluationResult,
EvaluationScore,
MetricCategory,
)
class ExecutionState:
current_agent_id: Optional[str] = None
current_task_id: Optional[str] = None
def __init__(self):
self.traces = {}
self.current_agent_id: str | None = None
self.current_task_id: str | None = None
self.iteration = 1
self.iterations_results = {}
self.agent_evaluators = {}
class AgentEvaluator:
def __init__(
self,
@@ -45,27 +59,45 @@ class AgentEvaluator:
@property
def _execution_state(self) -> ExecutionState:
if not hasattr(self._thread_local, 'execution_state'):
if not hasattr(self._thread_local, "execution_state"):
self._thread_local.execution_state = ExecutionState()
return self._thread_local.execution_state
def _subscribe_to_events(self) -> None:
from typing import cast
crewai_event_bus.register_handler(TaskCompletedEvent, cast(Any, self._handle_task_completed))
crewai_event_bus.register_handler(LiteAgentExecutionCompletedEvent, cast(Any, self._handle_lite_agent_completed))
crewai_event_bus.register_handler(
TaskCompletedEvent, cast(Any, self._handle_task_completed)
)
crewai_event_bus.register_handler(
LiteAgentExecutionCompletedEvent,
cast(Any, self._handle_lite_agent_completed),
)
def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None:
assert event.task is not None
agent = event.task.agent
if agent and str(getattr(agent, 'id', 'unknown')) in self._execution_state.agent_evaluators:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=str(event.task.id))
if (
agent
and str(getattr(agent, "id", "unknown"))
in self._execution_state.agent_evaluators
):
self.emit_evaluation_started_event(
agent_role=agent.role,
agent_id=str(agent.id),
task_id=str(event.task.id),
)
state = ExecutionState()
state.current_agent_id = str(agent.id)
state.current_task_id = str(event.task.id)
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
assert (
state.current_agent_id is not None and state.current_task_id is not None
)
trace = self.callback.get_trace(
state.current_agent_id, state.current_task_id
)
if not trace:
return
@@ -75,19 +107,28 @@ class AgentEvaluator:
task=event.task,
execution_trace=trace,
final_output=event.output,
state=state
state=state,
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
if agent.role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent.role] = []
if (
agent.role
not in self._execution_state.iterations_results[current_iteration]
):
self._execution_state.iterations_results[current_iteration][
agent.role
] = []
self._execution_state.iterations_results[current_iteration][agent.role].append(result)
self._execution_state.iterations_results[current_iteration][
agent.role
].append(result)
def _handle_lite_agent_completed(self, source: object, event: LiteAgentExecutionCompletedEvent) -> None:
def _handle_lite_agent_completed(
self, source: object, event: LiteAgentExecutionCompletedEvent
) -> None:
agent_info = event.agent_info
agent_id = str(agent_info["id"])
@@ -105,8 +146,12 @@ class AgentEvaluator:
if not target_agent:
return
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
assert (
state.current_agent_id is not None and state.current_task_id is not None
)
trace = self.callback.get_trace(
state.current_agent_id, state.current_task_id
)
if not trace:
return
@@ -115,7 +160,7 @@ class AgentEvaluator:
agent=target_agent,
execution_trace=trace,
final_output=event.output,
state=state
state=state,
)
current_iteration = self._execution_state.iteration
@@ -123,10 +168,17 @@ class AgentEvaluator:
self._execution_state.iterations_results[current_iteration] = {}
agent_role = target_agent.role
if agent_role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent_role] = []
if (
agent_role
not in self._execution_state.iterations_results[current_iteration]
):
self._execution_state.iterations_results[current_iteration][
agent_role
] = []
self._execution_state.iterations_results[current_iteration][agent_role].append(result)
self._execution_state.iterations_results[current_iteration][
agent_role
].append(result)
def set_iteration(self, iteration: int) -> None:
self._execution_state.iteration = iteration
@@ -135,14 +187,26 @@ class AgentEvaluator:
self._execution_state.iterations_results = {}
def get_evaluation_results(self) -> dict[str, list[AgentEvaluationResult]]:
if self._execution_state.iterations_results and self._execution_state.iteration in self._execution_state.iterations_results:
return self._execution_state.iterations_results[self._execution_state.iteration]
if (
self._execution_state.iterations_results
and self._execution_state.iteration
in self._execution_state.iterations_results
):
return self._execution_state.iterations_results[
self._execution_state.iteration
]
return {}
def display_results_with_iterations(self) -> None:
self.display_formatter.display_summary_results(self._execution_state.iterations_results)
self.display_formatter.display_summary_results(
self._execution_state.iterations_results
)
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = True) -> dict[str, AgentAggregatedEvaluationResult]:
def get_agent_evaluation(
self,
strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE,
include_evaluation_feedback: bool = True,
) -> dict[str, AgentAggregatedEvaluationResult]:
agent_results = {}
with crewai_event_bus.scoped_handlers():
task_results = self.get_evaluation_results()
@@ -156,13 +220,16 @@ class AgentEvaluator:
agent_id=agent_id,
agent_role=agent_role,
results=results,
strategy=strategy
strategy=strategy,
)
agent_results[agent_role] = aggregated_result
if self._execution_state.iterations_results and self._execution_state.iteration == max(self._execution_state.iterations_results.keys(), default=0):
if (
self._execution_state.iterations_results
and self._execution_state.iteration
== max(self._execution_state.iterations_results.keys(), default=0)
):
self.display_results_with_iterations()
if include_evaluation_feedback:
@@ -171,7 +238,9 @@ class AgentEvaluator:
return agent_results
def display_evaluation_with_feedback(self) -> None:
self.display_formatter.display_evaluation_with_feedback(self._execution_state.iterations_results)
self.display_formatter.display_evaluation_with_feedback(
self._execution_state.iterations_results
)
def evaluate(
self,
@@ -183,46 +252,91 @@ class AgentEvaluator:
) -> AgentEvaluationResult:
result = AgentEvaluationResult(
agent_id=state.current_agent_id or str(agent.id),
task_id=state.current_task_id or (str(task.id) if task else "unknown_task")
task_id=state.current_task_id or (str(task.id) if task else "unknown_task"),
)
assert self.evaluators is not None
task_id = str(task.id) if task else None
for evaluator in self.evaluators:
try:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id)
self.emit_evaluation_started_event(
agent_role=agent.role, agent_id=str(agent.id), task_id=task_id
)
score = evaluator.evaluate(
agent=agent,
task=task,
execution_trace=execution_trace,
final_output=final_output
final_output=final_output,
)
result.metrics[evaluator.metric_category] = score
self.emit_evaluation_completed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, metric_category=evaluator.metric_category, score=score)
self.emit_evaluation_completed_event(
agent_role=agent.role,
agent_id=str(agent.id),
task_id=task_id,
metric_category=evaluator.metric_category,
score=score,
)
except Exception as e:
self.emit_evaluation_failed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, error=str(e))
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
self.emit_evaluation_failed_event(
agent_role=agent.role,
agent_id=str(agent.id),
task_id=task_id,
error=str(e),
)
self.console_formatter.print(
f"Error in {evaluator.metric_category.value} evaluator: {str(e)}"
)
return result
def emit_evaluation_started_event(self, agent_role: str, agent_id: str, task_id: str | None = None):
def emit_evaluation_started_event(
self, agent_role: str, agent_id: str, task_id: str | None = None
):
crewai_event_bus.emit(
self,
AgentEvaluationStartedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration)
AgentEvaluationStartedEvent(
agent_role=agent_role,
agent_id=agent_id,
task_id=task_id,
iteration=self._execution_state.iteration,
),
)
def emit_evaluation_completed_event(self, agent_role: str, agent_id: str, task_id: str | None = None, metric_category: MetricCategory | None = None, score: EvaluationScore | None = None):
def emit_evaluation_completed_event(
self,
agent_role: str,
agent_id: str,
task_id: str | None = None,
metric_category: MetricCategory | None = None,
score: EvaluationScore | None = None,
):
crewai_event_bus.emit(
self,
AgentEvaluationCompletedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, metric_category=metric_category, score=score)
AgentEvaluationCompletedEvent(
agent_role=agent_role,
agent_id=agent_id,
task_id=task_id,
iteration=self._execution_state.iteration,
metric_category=metric_category,
score=score,
),
)
def emit_evaluation_failed_event(self, agent_role: str, agent_id: str, error: str, task_id: str | None = None):
def emit_evaluation_failed_event(
self, agent_role: str, agent_id: str, error: str, task_id: str | None = None
):
crewai_event_bus.emit(
self,
AgentEvaluationFailedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, error=error)
AgentEvaluationFailedEvent(
agent_role=agent_role,
agent_id=agent_id,
task_id=task_id,
iteration=self._execution_state.iteration,
error=error,
),
)
def create_default_evaluator(agents: list[Agent], llm: None = None):
from crewai.experimental.evaluation import (
GoalAlignmentEvaluator,
@@ -230,7 +344,7 @@ def create_default_evaluator(agents: list[Agent], llm: None = None):
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
ReasoningEfficiencyEvaluator,
)
evaluators = [

View File

@@ -3,18 +3,28 @@ from typing import Dict, Any, List
from rich.table import Table
from rich.box import HEAVY_EDGE, ROUNDED
from collections.abc import Sequence
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory
from crewai.experimental.evaluation.base_evaluator import (
AgentAggregatedEvaluationResult,
AggregationStrategy,
AgentEvaluationResult,
MetricCategory,
)
from crewai.experimental.evaluation import EvaluationScore
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.llm_utils import create_llm
class EvaluationDisplayFormatter:
def __init__(self):
self.console_formatter = ConsoleFormatter()
def display_evaluation_with_feedback(self, iterations_results: Dict[int, Dict[str, List[Any]]]):
def display_evaluation_with_feedback(
self, iterations_results: Dict[int, Dict[str, List[Any]]]
):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
self.console_formatter.print(
"[yellow]No evaluation results to display[/yellow]"
)
return
all_agent_roles: set[str] = set()
@@ -22,7 +32,9 @@ class EvaluationDisplayFormatter:
all_agent_roles.update(iter_results.keys())
for agent_role in sorted(all_agent_roles):
self.console_formatter.print(f"\n[bold cyan]Agent: {agent_role}[/bold cyan]")
self.console_formatter.print(
f"\n[bold cyan]Agent: {agent_role}[/bold cyan]"
)
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
@@ -62,9 +74,7 @@ class EvaluationDisplayFormatter:
table.add_section()
table.add_row(
metric.title(),
score_text,
evaluation_score.feedback or ""
metric.title(), score_text, evaluation_score.feedback or ""
)
if aggregated_result.overall_score is not None:
@@ -82,19 +92,26 @@ class EvaluationDisplayFormatter:
table.add_row(
"Overall Score",
f"[{overall_color}]{overall_score:.1f}[/]",
"Overall agent evaluation score"
"Overall agent evaluation score",
)
self.console_formatter.print(table)
def display_summary_results(self, iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]]):
def display_summary_results(
self,
iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]],
):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
self.console_formatter.print(
"[yellow]No evaluation results to display[/yellow]"
)
return
self.console_formatter.print("\n")
table = Table(title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE)
table = Table(
title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE
)
table.add_column("Agent/Metric", style="cyan")
@@ -123,11 +140,14 @@ class EvaluationDisplayFormatter:
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
strategy=AggregationStrategy.SIMPLE_AVERAGE
strategy=AggregationStrategy.SIMPLE_AVERAGE,
)
valid_scores = [score.score for score in aggregated_result.metrics.values()
if score.score is not None]
valid_scores = [
score.score
for score in aggregated_result.metrics.values()
if score.score is not None
]
if valid_scores:
avg_score = sum(valid_scores) / len(valid_scores)
agent_scores_by_iteration[iter_num] = avg_score
@@ -137,7 +157,9 @@ class EvaluationDisplayFormatter:
if not agent_scores_by_iteration:
continue
avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(agent_scores_by_iteration)
avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(
agent_scores_by_iteration
)
row = [f"[bold]{agent_role}[/bold]"]
@@ -178,9 +200,13 @@ class EvaluationDisplayFormatter:
row = [f" - {metric.title()}"]
for iter_num in sorted(iterations_results.keys()):
if (iter_num in agent_metrics_by_iteration and
metric in agent_metrics_by_iteration[iter_num]):
metric_score = agent_metrics_by_iteration[iter_num][metric].score
if (
iter_num in agent_metrics_by_iteration
and metric in agent_metrics_by_iteration[iter_num]
):
metric_score = agent_metrics_by_iteration[iter_num][
metric
].score
if metric_score is not None:
metric_scores.append(metric_score)
if metric_score >= 8.0:
@@ -225,7 +251,9 @@ class EvaluationDisplayFormatter:
results: Sequence[AgentEvaluationResult],
strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE,
) -> AgentAggregatedEvaluationResult:
metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(list)
metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(
list
)
for result in results:
for metric_name, evaluation_score in result.metrics.items():
@@ -246,19 +274,20 @@ class EvaluationDisplayFormatter:
metric=category.title(),
feedbacks=feedbacks,
scores=[s.score for s in scores],
strategy=strategy
strategy=strategy,
)
else:
feedback_summary = feedbacks[0]
aggregated_metrics[category] = EvaluationScore(
score=avg_score,
feedback=feedback_summary
score=avg_score, feedback=feedback_summary
)
overall_score = None
if aggregated_metrics:
valid_scores = [m.score for m in aggregated_metrics.values() if m.score is not None]
valid_scores = [
m.score for m in aggregated_metrics.values() if m.score is not None
]
if valid_scores:
overall_score = sum(valid_scores) / len(valid_scores)
@@ -268,7 +297,7 @@ class EvaluationDisplayFormatter:
metrics=aggregated_metrics,
overall_score=overall_score,
task_count=len(results),
aggregation_strategy=strategy
aggregation_strategy=strategy,
)
def _summarize_feedbacks(
@@ -277,10 +306,12 @@ class EvaluationDisplayFormatter:
metric: str,
feedbacks: List[str],
scores: List[float | None],
strategy: AggregationStrategy
strategy: AggregationStrategy,
) -> str:
if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks):
return "\n\n".join([f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)])
return "\n\n".join(
[f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)]
)
try:
llm = create_llm()
@@ -290,20 +321,26 @@ class EvaluationDisplayFormatter:
if len(feedback) > 500:
feedback = feedback[:500] + "..."
score_text = f"{score:.1f}" if score is not None else "N/A"
formatted_feedbacks.append(f"Feedback #{i+1} (Score: {score_text}):\n{feedback}")
formatted_feedbacks.append(
f"Feedback #{i+1} (Score: {score_text}):\n{feedback}"
)
all_feedbacks = "\n\n" + "\n\n---\n\n".join(formatted_feedbacks)
strategy_guidance = ""
if strategy == AggregationStrategy.BEST_PERFORMANCE:
strategy_guidance = "Focus on the highest-scoring aspects and strengths demonstrated."
strategy_guidance = (
"Focus on the highest-scoring aspects and strengths demonstrated."
)
elif strategy == AggregationStrategy.WORST_PERFORMANCE:
strategy_guidance = "Focus on areas that need improvement and common issues across tasks."
else:
strategy_guidance = "Provide a balanced analysis of strengths and weaknesses across all tasks."
prompt = [
{"role": "system", "content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback.
{
"role": "system",
"content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback.
Your job is to synthesize multiple feedback points about the same metric across different tasks.
Create a concise, insightful summary that captures the key patterns and themes from all feedback.
@@ -315,14 +352,18 @@ class EvaluationDisplayFormatter:
3. Highlighting patterns across tasks
4. 150-250 words in length
The summary should be directly usable as final feedback for the agent's performance on this metric."""},
{"role": "user", "content": f"""I need a synthesized summary of the following feedback for:
The summary should be directly usable as final feedback for the agent's performance on this metric.""",
},
{
"role": "user",
"content": f"""I need a synthesized summary of the following feedback for:
Agent Role: {agent_role}
Metric: {metric.title()}
{all_feedbacks}
"""}
""",
},
]
assert llm is not None
response = llm.call(prompt)
@@ -330,4 +371,6 @@ class EvaluationDisplayFormatter:
return response
except Exception:
return "Synthesized from multiple tasks: " + "\n\n".join([f"- {fb[:500]}..." for fb in feedbacks])
return "Synthesized from multiple tasks: " + "\n\n".join(
[f"- {fb[:500]}..." for fb in feedbacks]
)

View File

@@ -5,25 +5,23 @@ from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.agent_events import (
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.event_bus import CrewAIEventsBus
from crewai.events.types.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent
LiteAgentExecutionCompletedEvent,
)
from crewai.utilities.events.tool_usage_events import (
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolExecutionErrorEvent,
ToolSelectionErrorEvent,
ToolValidateInputErrorEvent
)
from crewai.utilities.events.llm_events import (
LLMCallStartedEvent,
LLMCallCompletedEvent
ToolValidateInputErrorEvent,
)
from crewai.events.types.llm_events import LLMCallStartedEvent, LLMCallCompletedEvent
class EvaluationTraceCallback(BaseEventListener):
"""Event listener for collecting execution traces for evaluation.
@@ -68,27 +66,49 @@ class EvaluationTraceCallback(BaseEventListener):
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_completed(source, event: ToolUsageFinishedEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True)
self.on_tool_use(
event.tool_name, event.tool_args, event.output, success=True
)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="usage_error")
self.on_tool_use(
event.tool_name,
event.tool_args,
event.error,
success=False,
error_type="usage_error",
)
@event_bus.on(ToolExecutionErrorEvent)
def on_tool_execution_error(source, event: ToolExecutionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="execution_error")
self.on_tool_use(
event.tool_name,
event.tool_args,
event.error,
success=False,
error_type="execution_error",
)
@event_bus.on(ToolSelectionErrorEvent)
def on_tool_selection_error(source, event: ToolSelectionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="selection_error")
self.on_tool_use(
event.tool_name,
event.tool_args,
event.error,
success=False,
error_type="selection_error",
)
@event_bus.on(ToolValidateInputErrorEvent)
def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="validation_error")
self.on_tool_use(
event.tool_name,
event.tool_args,
event.error,
success=False,
error_type="validation_error",
)
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
@@ -99,7 +119,7 @@ class EvaluationTraceCallback(BaseEventListener):
self.on_llm_call_end(event.messages, event.response)
def on_lite_agent_start(self, agent_info: dict[str, Any]):
self.current_agent_id = agent_info['id']
self.current_agent_id = agent_info["id"]
self.current_task_id = "lite_task"
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
@@ -110,7 +130,7 @@ class EvaluationTraceCallback(BaseEventListener):
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
final_output=None,
)
def _init_trace(self, trace_key: str, **kwargs: Any):
@@ -128,7 +148,7 @@ class EvaluationTraceCallback(BaseEventListener):
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
final_output=None,
)
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
@@ -151,8 +171,14 @@ class EvaluationTraceCallback(BaseEventListener):
self._reset_current()
def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
success: bool = True, error_type: str | None = None):
def on_tool_use(
self,
tool_name: str,
tool_args: dict[str, Any] | str,
result: Any,
success: bool = True,
error_type: str | None = None,
):
if not self.current_agent_id or not self.current_task_id:
return
@@ -163,7 +189,7 @@ class EvaluationTraceCallback(BaseEventListener):
"args": tool_args,
"result": result,
"success": success,
"timestamp": datetime.now()
"timestamp": datetime.now(),
}
# Add error information if applicable
@@ -173,7 +199,11 @@ class EvaluationTraceCallback(BaseEventListener):
self.traces[trace_key]["tool_uses"].append(tool_use)
def on_llm_call_start(self, messages: str | Sequence[dict[str, Any]] | None, tools: Sequence[dict[str, Any]] | None = None):
def on_llm_call_start(
self,
messages: str | Sequence[dict[str, Any]] | None,
tools: Sequence[dict[str, Any]] | None = None,
):
if not self.current_agent_id or not self.current_task_id:
return
@@ -186,10 +216,12 @@ class EvaluationTraceCallback(BaseEventListener):
"tools": tools,
"start_time": datetime.now(),
"response": None,
"end_time": None
"end_time": None,
}
def on_llm_call_end(self, messages: str | list[dict[str, Any]] | None, response: Any):
def on_llm_call_end(
self, messages: str | list[dict[str, Any]] | None, response: Any
):
if not self.current_agent_id or not self.current_task_id:
return
@@ -213,7 +245,7 @@ class EvaluationTraceCallback(BaseEventListener):
"response": response,
"start_time": start_time,
"end_time": current_time,
"total_tokens": total_tokens
"total_tokens": total_tokens,
}
self.traces[trace_key]["llm_calls"].append(llm_call)
@@ -227,7 +259,7 @@ class EvaluationTraceCallback(BaseEventListener):
def create_evaluation_callbacks() -> EvaluationTraceCallback:
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.events.event_bus import crewai_event_bus
callback = EvaluationTraceCallback()
callback.setup_listeners(crewai_event_bus)

View File

@@ -25,8 +25,8 @@ from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.types import FlowExecutionData
from crewai.flow.utils import get_possible_return_constants
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.flow_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowPlotEvent,
@@ -35,10 +35,10 @@ from crewai.utilities.events.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.utilities.events.listeners.tracing.trace_listener import (
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.events.listeners.tracing.utils import (
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled,
)
from crewai.utilities.printer import Printer
@@ -934,12 +934,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
method = self._methods[start_method_name]
enhanced_method = self._inject_trigger_payload_for_start_method(method)
result = await self._execute_method(
start_method_name, enhanced_method
)
result = await self._execute_method(start_method_name, enhanced_method)
await self._execute_listeners(start_method_name, result)
def _inject_trigger_payload_for_start_method(self, original_method: Callable) -> Callable:
def _inject_trigger_payload_for_start_method(
self, original_method: Callable
) -> Callable:
def prepare_kwargs(*args, **kwargs):
inputs = baggage.get_baggage("flow_inputs") or {}
trigger_payload = inputs.get("crewai_trigger_payload")
@@ -952,15 +952,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
elif trigger_payload is not None:
self._log_flow_event(
f"Trigger payload available but {original_method.__name__} doesn't accept crewai_trigger_payload parameter",
color="yellow"
color="yellow",
)
return args, kwargs
if asyncio.iscoroutinefunction(original_method):
async def enhanced_method(*args, **kwargs):
args, kwargs = prepare_kwargs(*args, **kwargs)
return await original_method(*args, **kwargs)
else:
def enhanced_method(*args, **kwargs):
args, kwargs = prepare_kwargs(*args, **kwargs)
return original_method(*args, **kwargs)

View File

@@ -1,6 +1,4 @@
import contextlib
import hashlib
import io
import logging
import os
import shutil
@@ -20,23 +18,7 @@ from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
from crewai.utilities.paths import db_storage_path
from crewai.utilities.chromadb import create_persistent_client
@contextlib.contextmanager
def suppress_logging(
logger_name="chromadb.segment.impl.vector.local_persistent_hnsw",
level=logging.ERROR,
):
logger = logging.getLogger(logger_name)
original_level = logger.getEffectiveLevel()
logger.setLevel(level)
with (
contextlib.redirect_stdout(io.StringIO()),
contextlib.redirect_stderr(io.StringIO()),
contextlib.suppress(UserWarning),
):
yield
logger.setLevel(original_level)
from crewai.utilities.logger_utils import suppress_logging
class KnowledgeStorage(BaseKnowledgeStorage):
@@ -64,7 +46,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
with suppress_logging():
with suppress_logging(
"chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR
):
if self.collection:
fetched = self.collection.query(
query_texts=query,

View File

@@ -62,19 +62,14 @@ from crewai.utilities.agent_utils import (
render_text_description_and_args,
)
from crewai.utilities.converter import generate_model_description
from crewai.utilities.events.agent_events import (
AgentLogsExecutionEvent,
from crewai.events.types.logging_events import AgentLogsExecutionEvent
from crewai.events.types.agent_events import (
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.printer import Printer
from crewai.utilities.token_counter_callback import TokenCalcHandler
@@ -519,19 +514,6 @@ class LiteAgent(FlowTrackable, BaseModel):
enforce_rpm_limit(self.request_within_rpm_limit)
llm = cast(LLM, self.llm)
model = llm.model if hasattr(llm, "model") else "unknown"
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=self._messages,
tools=None,
callbacks=self._callbacks,
from_agent=self,
model=model,
),
)
try:
answer = get_llm_response(
llm=cast(LLM, self.llm),
@@ -541,23 +523,7 @@ class LiteAgent(FlowTrackable, BaseModel):
from_agent=self,
)
# Emit LLM call completed event
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(
messages=self._messages,
response=answer,
call_type=LLMCallType.LLM_CALL,
from_agent=self,
model=model,
),
)
except Exception as e:
# Emit LLM call failed event
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e), from_agent=self),
)
raise e
formatted_answer = process_llm_response(answer, self.use_stop_words)

View File

@@ -23,24 +23,25 @@ from dotenv import load_dotenv
from litellm.types.utils import ChatCompletionDeltaToolCall
from pydantic import BaseModel, Field
from crewai.utilities.events.llm_events import (
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
LLMStreamChunkEvent,
)
from crewai.utilities.events.tool_usage_events import (
from crewai.events.types.tool_usage_events import (
ToolUsageStartedEvent,
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
)
from crewai.utilities.exceptions import LLMQuotaLimitExceededException
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
import litellm
from litellm import Choices
from litellm.exceptions import ContextWindowExceededError
from litellm.exceptions import ContextWindowExceededError, RateLimitError
from litellm.litellm_core_utils.get_supported_openai_params import (
get_supported_openai_params,
)
@@ -52,7 +53,7 @@ import io
from typing import TextIO
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.events import crewai_event_bus
from crewai.events.event_bus import crewai_event_bus
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
@@ -311,7 +312,7 @@ class LLM(BaseLLM):
api_base: Optional[str] = None,
api_version: Optional[str] = None,
api_key: Optional[str] = None,
callbacks: List[Any] = [],
callbacks: List[Any] | None = None,
reasoning_effort: Optional[Literal["none", "low", "medium", "high"]] = None,
stream: bool = False,
**kwargs,
@@ -351,7 +352,7 @@ class LLM(BaseLLM):
else:
self.stop = stop
self.set_callbacks(callbacks)
self.set_callbacks(callbacks or [])
self.set_env_callbacks()
def _is_anthropic_model(self, model: str) -> bool:
@@ -669,6 +670,10 @@ class LLM(BaseLLM):
)
return full_response
except RateLimitError as e:
# Convert litellm's rate limit error to our own exception type
# for graceful quota limit handling
raise LLMQuotaLimitExceededException(str(e))
except ContextWindowExceededError as e:
# Catch context window errors from litellm and convert them to our own exception type.
# This exception is handled by CrewAgentExecutor._invoke_loop() which can then
@@ -812,6 +817,10 @@ class LLM(BaseLLM):
# length issues appropriately.
response = litellm.completion(**params)
except RateLimitError as e:
# Convert litellm's rate limit error to our own exception type
# for graceful quota limit handling
raise LLMQuotaLimitExceededException(str(e))
except ContextWindowExceededError as e:
# Convert litellm's context window error to our own exception type
# for consistent handling in the rest of the codebase
@@ -851,7 +860,9 @@ class LLM(BaseLLM):
return tool_calls
# --- 7) Handle tool calls if present
tool_result = self._handle_tool_call(tool_calls, available_functions)
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# --- 8) If tool call handling didn't return a result, emit completion event and return text response
@@ -868,6 +879,8 @@ class LLM(BaseLLM):
self,
tool_calls: List[Any],
available_functions: Optional[Dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> Optional[str]:
"""Handle a tool call from the LLM.
@@ -902,6 +915,8 @@ class LLM(BaseLLM):
event=ToolUsageStartedEvent(
tool_name=function_name,
tool_args=function_args,
from_agent=from_agent,
from_task=from_task,
),
)
@@ -914,12 +929,17 @@ class LLM(BaseLLM):
tool_args=function_args,
started_at=started_at,
finished_at=datetime.now(),
from_task=from_task,
from_agent=from_agent,
),
)
# --- 3.3) Emit success event
self._handle_emit_call_events(
response=result, call_type=LLMCallType.TOOL_CALL
response=result,
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
)
return result
except Exception as e:
@@ -939,6 +959,8 @@ class LLM(BaseLLM):
tool_name=function_name,
tool_args=function_args,
error=f"Tool execution error: {str(e)}",
from_task=from_task,
from_agent=from_agent,
),
)
return None
@@ -1139,7 +1161,11 @@ class LLM(BaseLLM):
# TODO: Remove this code after merging PR https://github.com/BerriAI/litellm/pull/10917
# Ollama doesn't supports last message to be 'assistant'
if "ollama" in self.model.lower() and messages and messages[-1]["role"] == "assistant":
if (
"ollama" in self.model.lower()
and messages
and messages[-1]["role"] == "assistant"
):
return messages + [{"role": "user", "content": ""}]
# Handle Anthropic models

View File

@@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, TYPE_CHECKING
from crewai.memory import (
EntityMemory,
@@ -7,6 +7,10 @@ from crewai.memory import (
ShortTermMemory,
)
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.task import Task
class ContextualMemory:
def __init__(
@@ -15,11 +19,28 @@ class ContextualMemory:
ltm: LongTermMemory,
em: EntityMemory,
exm: ExternalMemory,
agent: Optional["Agent"] = None,
task: Optional["Task"] = None,
):
self.stm = stm
self.ltm = ltm
self.em = em
self.exm = exm
self.agent = agent
self.task = task
if self.stm is not None:
self.stm.agent = self.agent
self.stm.task = self.task
if self.ltm is not None:
self.ltm.agent = self.agent
self.ltm.task = self.task
if self.em is not None:
self.em.agent = self.agent
self.em.task = self.task
if self.exm is not None:
self.exm.agent = self.agent
self.exm.task = self.task
def build_context_for_task(self, task, context) -> str:
"""
@@ -49,10 +70,7 @@ class ContextualMemory:
stm_results = self.stm.search(query)
formatted_results = "\n".join(
[
f"- {result['context']}"
for result in stm_results
]
[f"- {result['context']}" for result in stm_results]
)
return f"Recent Insights:\n{formatted_results}" if stm_results else ""
@@ -89,10 +107,7 @@ class ContextualMemory:
em_results = self.em.search(query)
formatted_results = "\n".join(
[
f"- {result['context']}"
for result in em_results
] # type: ignore # Invalid index type "str" for "str"; expected type "SupportsIndex | slice"
[f"- {result['context']}" for result in em_results] # type: ignore # Invalid index type "str" for "str"; expected type "SupportsIndex | slice"
)
return f"Entities:\n{formatted_results}" if em_results else ""

View File

@@ -1,4 +1,4 @@
from typing import Optional
from typing import Any
import time
from pydantic import PrivateAttr
@@ -6,8 +6,8 @@ from pydantic import PrivateAttr
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
@@ -24,7 +24,7 @@ class EntityMemory(Memory):
Inherits from the Memory class.
"""
_memory_provider: Optional[str] = PrivateAttr()
_memory_provider: str | None = PrivateAttr()
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
memory_provider = embedder_config.get("provider") if embedder_config else None
@@ -35,7 +35,7 @@ class EntityMemory(Memory):
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
config = embedder_config.get("config")
config = embedder_config.get("config") if embedder_config else None
storage = Mem0Storage(type="short_term", crew=crew, config=config)
else:
storage = (
@@ -53,47 +53,99 @@ class EntityMemory(Memory):
super().__init__(storage=storage)
self._memory_provider = memory_provider
def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
"""Saves an entity item into the SQLite storage."""
def save(
self,
value: EntityMemoryItem | list[EntityMemoryItem],
metadata: dict[str, Any] | None = None,
) -> None:
"""Saves one or more entity items into the SQLite storage.
Args:
value: Single EntityMemoryItem or list of EntityMemoryItems to save.
metadata: Optional metadata dict (included for supertype compatibility but not used).
Notes:
The metadata parameter is included to satisfy the supertype signature but is not
used - entity metadata is extracted from the EntityMemoryItem objects themselves.
"""
if not value:
return
items = value if isinstance(value, list) else [value]
is_batch = len(items) > 1
metadata = {"entity_count": len(items)} if is_batch else items[0].metadata
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
metadata=item.metadata,
metadata=metadata,
source_type="entity_memory",
from_agent=self.agent,
from_task=self.task,
),
)
start_time = time.time()
saved_count = 0
errors = []
try:
if self._memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
for item in items:
try:
if self._memory_provider == "mem0":
data = f"""
Remember details about the following entity:
Name: {item.name}
Type: {item.type}
Entity Description: {item.description}
"""
else:
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
saved_count += 1
except Exception as e:
errors.append(f"{item.name}: {str(e)}")
if is_batch:
emit_value = f"Saved {saved_count} entities"
metadata = {"entity_count": saved_count, "errors": errors}
else:
data = f"{item.name}({item.type}): {item.description}"
emit_value = f"{items[0].name}({items[0].type}): {items[0].description}"
metadata = items[0].metadata
super().save(data, item.metadata)
# Emit memory save completed event
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=data,
metadata=item.metadata,
value=emit_value,
metadata=metadata,
save_time_ms=(time.time() - start_time) * 1000,
source_type="entity_memory",
from_agent=self.agent,
from_task=self.task,
),
)
if errors:
raise Exception(
f"Partial save: {len(errors)} failed out of {len(items)}"
)
except Exception as e:
fail_metadata = (
{"entity_count": len(items), "saved": saved_count}
if is_batch
else items[0].metadata
)
crewai_event_bus.emit(
self,
event=MemorySaveFailedEvent(
metadata=item.metadata,
metadata=fail_metadata,
error=str(e),
source_type="entity_memory",
from_agent=self.agent,
from_task=self.task,
),
)
raise
@@ -111,6 +163,8 @@ class EntityMemory(Memory):
limit=limit,
score_threshold=score_threshold,
source_type="entity_memory",
from_agent=self.agent,
from_task=self.task,
),
)
@@ -129,6 +183,8 @@ class EntityMemory(Memory):
score_threshold=score_threshold,
query_time_ms=(time.time() - start_time) * 1000,
source_type="entity_memory",
from_agent=self.agent,
from_task=self.task,
),
)

View File

@@ -4,8 +4,8 @@ import time
from crewai.memory.external.external_memory_item import ExternalMemoryItem
from crewai.memory.memory import Memory
from crewai.memory.storage.interface import Storage
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
@@ -53,7 +53,6 @@ class ExternalMemory(Memory):
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""Saves a value into the external storage."""
crewai_event_bus.emit(
@@ -61,24 +60,30 @@ class ExternalMemory(Memory):
event=MemorySaveStartedEvent(
value=value,
metadata=metadata,
agent_role=agent,
source_type="external_memory",
from_agent=self.agent,
from_task=self.task,
),
)
start_time = time.time()
try:
item = ExternalMemoryItem(value=value, metadata=metadata, agent=agent)
super().save(value=item.value, metadata=item.metadata, agent=item.agent)
item = ExternalMemoryItem(
value=value,
metadata=metadata,
agent=self.agent.role if self.agent else None,
)
super().save(value=item.value, metadata=item.metadata)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=value,
metadata=metadata,
agent_role=agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="external_memory",
from_agent=self.agent,
from_task=self.task,
),
)
except Exception as e:
@@ -87,9 +92,10 @@ class ExternalMemory(Memory):
event=MemorySaveFailedEvent(
value=value,
metadata=metadata,
agent_role=agent,
error=str(e),
source_type="external_memory",
from_agent=self.agent,
from_task=self.task,
),
)
raise
@@ -107,6 +113,8 @@ class ExternalMemory(Memory):
limit=limit,
score_threshold=score_threshold,
source_type="external_memory",
from_agent=self.agent,
from_task=self.task,
),
)
@@ -125,6 +133,8 @@ class ExternalMemory(Memory):
score_threshold=score_threshold,
query_time_ms=(time.time() - start_time) * 1000,
source_type="external_memory",
from_agent=self.agent,
from_task=self.task,
),
)

View File

@@ -3,8 +3,8 @@ import time
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.memory import Memory
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
@@ -37,13 +37,17 @@ class LongTermMemory(Memory):
metadata=item.metadata,
agent_role=item.agent,
source_type="long_term_memory",
from_agent=self.agent,
from_task=self.task,
),
)
start_time = time.time()
try:
metadata = item.metadata
metadata.update({"agent": item.agent, "expected_output": item.expected_output})
metadata.update(
{"agent": item.agent, "expected_output": item.expected_output}
)
self.storage.save( # type: ignore # BUG?: Unexpected keyword argument "task_description","score","datetime" for "save" of "Storage"
task_description=item.task,
score=metadata["quality"],
@@ -59,6 +63,8 @@ class LongTermMemory(Memory):
agent_role=item.agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="long_term_memory",
from_agent=self.agent,
from_task=self.task,
),
)
except Exception as e:
@@ -74,13 +80,19 @@ class LongTermMemory(Memory):
)
raise
def search(self, task: str, latest_n: int = 3) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
def search( # type: ignore # signature of "search" incompatible with supertype "Memory"
self,
task: str,
latest_n: int = 3,
) -> List[Dict[str, Any]]: # type: ignore # signature of "search" incompatible with supertype "Memory"
crewai_event_bus.emit(
self,
event=MemoryQueryStartedEvent(
query=task,
limit=latest_n,
source_type="long_term_memory",
from_agent=self.agent,
from_task=self.task,
),
)
@@ -96,6 +108,8 @@ class LongTermMemory(Memory):
limit=latest_n,
query_time_ms=(time.time() - start_time) * 1000,
source_type="long_term_memory",
from_agent=self.agent,
from_task=self.task,
),
)

View File

@@ -1,7 +1,11 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from pydantic import BaseModel
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.task import Task
class Memory(BaseModel):
"""
@@ -12,19 +16,38 @@ class Memory(BaseModel):
crew: Optional[Any] = None
storage: Any
_agent: Optional["Agent"] = None
_task: Optional["Task"] = None
def __init__(self, storage: Any, **data: Any):
super().__init__(storage=storage, **data)
@property
def task(self) -> Optional["Task"]:
"""Get the current task associated with this memory."""
return self._task
@task.setter
def task(self, task: Optional["Task"]) -> None:
"""Set the current task associated with this memory."""
self._task = task
@property
def agent(self) -> Optional["Agent"]:
"""Get the current agent associated with this memory."""
return self._agent
@agent.setter
def agent(self, agent: Optional["Agent"]) -> None:
"""Set the current agent associated with this memory."""
self._agent = agent
def save(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
metadata = metadata or {}
if agent:
metadata["agent"] = agent
self.storage.save(value, metadata)

View File

@@ -6,8 +6,8 @@ from pydantic import PrivateAttr
from crewai.memory.memory import Memory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemoryQueryStartedEvent,
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
@@ -37,7 +37,7 @@ class ShortTermMemory(Memory):
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
config = embedder_config.get("config")
config = embedder_config.get("config") if embedder_config else None
storage = Mem0Storage(type="short_term", crew=crew, config=config)
else:
storage = (
@@ -57,34 +57,42 @@ class ShortTermMemory(Memory):
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
crewai_event_bus.emit(
self,
event=MemorySaveStartedEvent(
value=value,
metadata=metadata,
agent_role=agent,
source_type="short_term_memory",
from_agent=self.agent,
from_task=self.task,
),
)
start_time = time.time()
try:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
item = ShortTermMemoryItem(
data=value,
metadata=metadata,
agent=self.agent.role if self.agent else None,
)
if self._memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
item.data = (
f"Remember the following insights from Agent run: {item.data}"
)
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
super().save(value=item.data, metadata=item.metadata)
crewai_event_bus.emit(
self,
event=MemorySaveCompletedEvent(
value=value,
metadata=metadata,
agent_role=agent,
# agent_role=agent,
save_time_ms=(time.time() - start_time) * 1000,
source_type="short_term_memory",
from_agent=self.agent,
from_task=self.task,
),
)
except Exception as e:
@@ -93,9 +101,10 @@ class ShortTermMemory(Memory):
event=MemorySaveFailedEvent(
value=value,
metadata=metadata,
agent_role=agent,
error=str(e),
source_type="short_term_memory",
from_agent=self.agent,
from_task=self.task,
),
)
raise
@@ -113,6 +122,8 @@ class ShortTermMemory(Memory):
limit=limit,
score_threshold=score_threshold,
source_type="short_term_memory",
from_agent=self.agent,
from_task=self.task,
),
)
@@ -131,6 +142,8 @@ class ShortTermMemory(Memory):
score_threshold=score_threshold,
query_time_ms=(time.time() - start_time) * 1000,
source_type="short_term_memory",
from_agent=self.agent,
from_task=self.task,
),
)

View File

@@ -18,9 +18,7 @@ class KickoffTaskOutputsSQLiteStorage:
An updated SQLite storage class for kickoff task outputs storage.
"""
def __init__(
self, db_path: Optional[str] = None
) -> None:
def __init__(self, db_path: Optional[str] = None) -> None:
if db_path is None:
# Get the parent directory of the default db path and create our db file there
db_path = str(Path(db_storage_path()) / "latest_kickoff_task_outputs.db")
@@ -67,7 +65,7 @@ class KickoffTaskOutputsSQLiteStorage:
output: Dict[str, Any],
task_index: int,
was_replayed: bool = False,
inputs: Dict[str, Any] = {},
inputs: Dict[str, Any] | None = None,
) -> None:
"""Add a new task output record to the database.
@@ -81,6 +79,7 @@ class KickoffTaskOutputsSQLiteStorage:
Raises:
DatabaseOperationError: If saving the task output fails due to SQLite errors.
"""
inputs = inputs or {}
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute("BEGIN TRANSACTION")
@@ -146,7 +145,9 @@ class KickoffTaskOutputsSQLiteStorage:
conn.commit()
if cursor.rowcount == 0:
logger.warning(f"No row found with task_index {task_index}. No update performed.")
logger.warning(
f"No row found with task_index {task_index}. No update performed."
)
except sqlite3.Error as e:
error_msg = DatabaseError.format_error(DatabaseError.UPDATE_ERROR, e)
logger.error(error_msg)

View File

@@ -1,5 +1,3 @@
import contextlib
import io
import logging
import os
import shutil
@@ -12,26 +10,10 @@ from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.chromadb import create_persistent_client
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path
from crewai.utilities.logger_utils import suppress_logging
import warnings
@contextlib.contextmanager
def suppress_logging(
logger_name="chromadb.segment.impl.vector.local_persistent_hnsw",
level=logging.ERROR,
):
logger = logging.getLogger(logger_name)
original_level = logger.getEffectiveLevel()
logger.setLevel(level)
with (
contextlib.redirect_stdout(io.StringIO()),
contextlib.redirect_stderr(io.StringIO()),
contextlib.suppress(UserWarning),
):
yield
logger.setLevel(original_level)
class RAGStorage(BaseRAGStorage):
"""
Extends Storage to handle embeddings for memory entries, improving
@@ -122,7 +104,9 @@ class RAGStorage(BaseRAGStorage):
self._initialize_app()
try:
with suppress_logging():
with suppress_logging(
"chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR
):
response = self.collection.query(query_texts=query, n_results=limit)
results = []

View File

@@ -1 +1,58 @@
"""RAG (Retrieval-Augmented Generation) infrastructure for CrewAI."""
"""RAG (Retrieval-Augmented Generation) infrastructure for CrewAI."""
import sys
import importlib
from types import ModuleType
from typing import Any
from crewai.rag.config.types import RagConfigType
from crewai.rag.config.utils import set_rag_config
_module_path = __path__
_module_file = __file__
class _RagModule(ModuleType):
"""Module wrapper to intercept attribute setting for config."""
__path__ = _module_path
__file__ = _module_file
def __init__(self, module_name: str):
"""Initialize the module wrapper.
Args:
module_name: Name of the module.
"""
super().__init__(module_name)
def __setattr__(self, name: str, value: RagConfigType) -> None:
"""Set module attributes.
Args:
name: Attribute name.
value: Attribute value.
"""
if name == "config":
return set_rag_config(value)
raise AttributeError(f"Setting attribute '{name}' is not allowed.")
def __getattr__(self, name: str) -> Any:
"""Get module attributes.
Args:
name: Attribute name.
Returns:
The requested attribute.
Raises:
AttributeError: If attribute doesn't exist.
"""
try:
return importlib.import_module(f"{self.__name__}.{name}")
except ImportError:
raise AttributeError(f"module '{self.__name__}' has no attribute '{name}'")
sys.modules[__name__] = _RagModule(__name__)

View File

@@ -1,5 +1,6 @@
"""ChromaDB client implementation."""
import logging
from typing import Any
from chromadb.api.types import (
@@ -20,7 +21,9 @@ from crewai.rag.chromadb.utils import (
_is_sync_client,
_prepare_documents_for_chromadb,
_process_query_results,
_sanitize_collection_name,
)
from crewai.utilities.logger_utils import suppress_logging
from crewai.rag.core.base_client import (
BaseClient,
BaseCollectionParams,
@@ -40,8 +43,19 @@ class ChromaDBClient(BaseClient):
embedding_function: Function to generate embeddings for documents.
"""
client: ChromaDBClientType
embedding_function: ChromaEmbeddingFunction[Embeddable]
def __init__(
self,
client: ChromaDBClientType,
embedding_function: ChromaEmbeddingFunction[Embeddable],
) -> None:
"""Initialize ChromaDBClient with client and embedding function.
Args:
client: Pre-configured ChromaDB client instance.
embedding_function: Embedding function for text to vector conversion.
"""
self.client = client
self.embedding_function = embedding_function
def create_collection(
self, **kwargs: Unpack[ChromaDBCollectionCreateParams]
@@ -86,7 +100,7 @@ class ChromaDBClient(BaseClient):
metadata["hnsw:space"] = "cosine"
self.client.create_collection(
name=kwargs["collection_name"],
name=_sanitize_collection_name(kwargs["collection_name"]),
configuration=kwargs.get("configuration"),
metadata=metadata,
embedding_function=kwargs.get(
@@ -143,7 +157,7 @@ class ChromaDBClient(BaseClient):
metadata["hnsw:space"] = "cosine"
await self.client.create_collection(
name=kwargs["collection_name"],
name=_sanitize_collection_name(kwargs["collection_name"]),
configuration=kwargs.get("configuration"),
metadata=metadata,
embedding_function=kwargs.get(
@@ -194,7 +208,7 @@ class ChromaDBClient(BaseClient):
metadata["hnsw:space"] = "cosine"
return self.client.get_or_create_collection(
name=kwargs["collection_name"],
name=_sanitize_collection_name(kwargs["collection_name"]),
configuration=kwargs.get("configuration"),
metadata=metadata,
embedding_function=kwargs.get(
@@ -247,7 +261,7 @@ class ChromaDBClient(BaseClient):
metadata["hnsw:space"] = "cosine"
return await self.client.get_or_create_collection(
name=kwargs["collection_name"],
name=_sanitize_collection_name(kwargs["collection_name"]),
configuration=kwargs.get("configuration"),
metadata=metadata,
embedding_function=kwargs.get(
@@ -287,12 +301,12 @@ class ChromaDBClient(BaseClient):
raise ValueError("Documents list cannot be empty")
collection = self.client.get_collection(
name=collection_name,
name=_sanitize_collection_name(collection_name),
embedding_function=self.embedding_function,
)
prepared = _prepare_documents_for_chromadb(documents)
collection.add(
collection.upsert(
ids=prepared.ids,
documents=prepared.texts,
metadatas=prepared.metadatas,
@@ -329,11 +343,11 @@ class ChromaDBClient(BaseClient):
raise ValueError("Documents list cannot be empty")
collection = await self.client.get_collection(
name=collection_name,
name=_sanitize_collection_name(collection_name),
embedding_function=self.embedding_function,
)
prepared = _prepare_documents_for_chromadb(documents)
await collection.add(
await collection.upsert(
ids=prepared.ids,
documents=prepared.texts,
metadatas=prepared.metadatas,
@@ -374,19 +388,22 @@ class ChromaDBClient(BaseClient):
params = _extract_search_params(kwargs)
collection = self.client.get_collection(
name=params.collection_name,
name=_sanitize_collection_name(params.collection_name),
embedding_function=self.embedding_function,
)
where = params.where if params.where is not None else params.metadata_filter
results: QueryResult = collection.query(
query_texts=[params.query],
n_results=params.limit,
where=where,
where_document=params.where_document,
include=params.include,
)
with suppress_logging(
"chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR
):
results: QueryResult = collection.query(
query_texts=[params.query],
n_results=params.limit,
where=where,
where_document=params.where_document,
include=params.include,
)
return _process_query_results(
collection=collection,
@@ -429,19 +446,22 @@ class ChromaDBClient(BaseClient):
params = _extract_search_params(kwargs)
collection = await self.client.get_collection(
name=params.collection_name,
name=_sanitize_collection_name(params.collection_name),
embedding_function=self.embedding_function,
)
where = params.where if params.where is not None else params.metadata_filter
results: QueryResult = await collection.query(
query_texts=[params.query],
n_results=params.limit,
where=where,
where_document=params.where_document,
include=params.include,
)
with suppress_logging(
"chromadb.segment.impl.vector.local_persistent_hnsw", logging.ERROR
):
results: QueryResult = await collection.query(
query_texts=[params.query],
n_results=params.limit,
where=where,
where_document=params.where_document,
include=params.include,
)
return _process_query_results(
collection=collection,
@@ -474,7 +494,7 @@ class ChromaDBClient(BaseClient):
)
collection_name = kwargs["collection_name"]
self.client.delete_collection(name=collection_name)
self.client.delete_collection(name=_sanitize_collection_name(collection_name))
async def adelete_collection(self, **kwargs: Unpack[BaseCollectionParams]) -> None:
"""Delete a collection and all its data asynchronously.
@@ -504,7 +524,9 @@ class ChromaDBClient(BaseClient):
)
collection_name = kwargs["collection_name"]
await self.client.delete_collection(name=collection_name)
await self.client.delete_collection(
name=_sanitize_collection_name(collection_name)
)
def reset(self) -> None:
"""Reset the vector database by deleting all collections and data.

View File

@@ -0,0 +1,65 @@
"""ChromaDB configuration model."""
import warnings
from dataclasses import field
from typing import Literal, cast
from pydantic.dataclasses import dataclass as pyd_dataclass
from chromadb.config import Settings
from chromadb.utils.embedding_functions import DefaultEmbeddingFunction
from crewai.rag.chromadb.types import ChromaEmbeddingFunctionWrapper
from crewai.rag.config.base import BaseRagConfig
from crewai.rag.chromadb.constants import (
DEFAULT_TENANT,
DEFAULT_DATABASE,
DEFAULT_STORAGE_PATH,
)
warnings.filterwarnings(
"ignore",
message=".*Mixing V1 models and V2 models.*",
category=UserWarning,
module="pydantic._internal._generate_schema",
)
warnings.filterwarnings(
"ignore",
message=r".*'model_fields'.*is deprecated.*",
module=r"^chromadb(\.|$)",
)
def _default_settings() -> Settings:
"""Create default ChromaDB settings.
Returns:
Settings with persistent storage and reset enabled.
"""
return Settings(
persist_directory=DEFAULT_STORAGE_PATH,
allow_reset=True,
is_persistent=True,
)
def _default_embedding_function() -> ChromaEmbeddingFunctionWrapper:
"""Create default ChromaDB embedding function.
Returns:
Default embedding function using all-MiniLM-L6-v2 via ONNX.
"""
return cast(ChromaEmbeddingFunctionWrapper, DefaultEmbeddingFunction())
@pyd_dataclass(frozen=True)
class ChromaDBConfig(BaseRagConfig):
"""Configuration for ChromaDB client."""
provider: Literal["chromadb"] = field(default="chromadb", init=False)
tenant: str = DEFAULT_TENANT
database: str = DEFAULT_DATABASE
settings: Settings = field(default_factory=_default_settings)
embedding_function: ChromaEmbeddingFunctionWrapper = field(
default_factory=_default_embedding_function
)

View File

@@ -0,0 +1,17 @@
"""Constants for ChromaDB configuration."""
import re
from typing import Final
from crewai.utilities.paths import db_storage_path
DEFAULT_TENANT: Final[str] = "default_tenant"
DEFAULT_DATABASE: Final[str] = "default_database"
DEFAULT_STORAGE_PATH: Final[str] = db_storage_path()
MIN_COLLECTION_LENGTH: Final[int] = 3
MAX_COLLECTION_LENGTH: Final[int] = 63
DEFAULT_COLLECTION: Final[str] = "default_collection"
INVALID_CHARS_PATTERN: Final[re.Pattern[str]] = re.compile(r"[^a-zA-Z0-9_-]")
IPV4_PATTERN: Final[re.Pattern[str]] = re.compile(r"^(\d{1,3}\.){3}\d{1,3}$")

View File

@@ -0,0 +1,40 @@
"""Factory functions for creating ChromaDB clients."""
import os
from hashlib import md5
import portalocker
from chromadb import PersistentClient
from crewai.rag.chromadb.config import ChromaDBConfig
from crewai.rag.chromadb.client import ChromaDBClient
def create_client(config: ChromaDBConfig) -> ChromaDBClient:
"""Create a ChromaDBClient from configuration.
Args:
config: ChromaDB configuration object.
Returns:
Configured ChromaDBClient instance.
Notes:
Need to update to use chromadb.Client to support more client types in the near future.
"""
persist_dir = config.settings.persist_directory
lock_id = md5(persist_dir.encode(), usedforsecurity=False).hexdigest()
lockfile = os.path.join(persist_dir, f"chromadb-{lock_id}.lock")
with portalocker.Lock(lockfile):
client = PersistentClient(
path=persist_dir,
settings=config.settings,
tenant=config.tenant,
database=config.database,
)
return ChromaDBClient(
client=client,
embedding_function=config.embedding_function,
)

View File

@@ -3,6 +3,8 @@
from collections.abc import Mapping
from typing import Any, NamedTuple
from pydantic import GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
from chromadb.api import ClientAPI, AsyncClientAPI
from chromadb.api.configuration import CollectionConfigurationInterface
from chromadb.api.types import (
@@ -21,6 +23,21 @@ from crewai.rag.core.base_client import BaseCollectionParams, BaseCollectionSear
ChromaDBClientType = ClientAPI | AsyncClientAPI
class ChromaEmbeddingFunctionWrapper(ChromaEmbeddingFunction[Embeddable]):
"""Base class for ChromaDB EmbeddingFunction to work with Pydantic validation."""
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: GetCoreSchemaHandler
) -> CoreSchema:
"""Generate Pydantic core schema for ChromaDB EmbeddingFunction.
This allows Pydantic to handle ChromaDB's EmbeddingFunction type
without requiring arbitrary_types_allowed=True.
"""
return core_schema.any_schema()
class PreparedDocuments(NamedTuple):
"""Prepared documents ready for ChromaDB insertion.

View File

@@ -10,10 +10,15 @@ from chromadb.api.types import (
IncludeEnum,
QueryResult,
)
from chromadb.api.models.AsyncCollection import AsyncCollection
from chromadb.api.models.Collection import Collection
from crewai.rag.chromadb.constants import (
DEFAULT_COLLECTION,
INVALID_CHARS_PATTERN,
IPV4_PATTERN,
MAX_COLLECTION_LENGTH,
MIN_COLLECTION_LENGTH,
)
from crewai.rag.chromadb.types import (
ChromaDBClientType,
ChromaDBCollectionSearchParams,
@@ -218,3 +223,58 @@ def _process_query_results(
distance_metric=distance_metric,
score_threshold=params.score_threshold,
)
def _is_ipv4_pattern(name: str) -> bool:
"""Check if a string matches an IPv4 address pattern.
Args:
name: The string to check
Returns:
True if the string matches an IPv4 pattern, False otherwise
"""
return bool(IPV4_PATTERN.match(name))
def _sanitize_collection_name(
name: str | None, max_collection_length: int = MAX_COLLECTION_LENGTH
) -> str:
"""Sanitize a collection name to meet ChromaDB requirements.
Requirements:
1. 3-63 characters long
2. Starts and ends with alphanumeric character
3. Contains only alphanumeric characters, underscores, or hyphens
4. No consecutive periods
5. Not a valid IPv4 address
Args:
name: The original collection name to sanitize
max_collection_length: Maximum allowed length for the collection name
Returns:
A sanitized collection name that meets ChromaDB requirements
"""
if not name:
return DEFAULT_COLLECTION
if _is_ipv4_pattern(name):
name = f"ip_{name}"
sanitized = INVALID_CHARS_PATTERN.sub("_", name)
if not sanitized[0].isalnum():
sanitized = "a" + sanitized
if not sanitized[-1].isalnum():
sanitized = sanitized[:-1] + "z"
if len(sanitized) < MIN_COLLECTION_LENGTH:
sanitized = sanitized + "x" * (MIN_COLLECTION_LENGTH - len(sanitized))
if len(sanitized) > max_collection_length:
sanitized = sanitized[:max_collection_length]
if not sanitized[-1].isalnum():
sanitized = sanitized[:-1] + "z"
return sanitized

View File

@@ -0,0 +1 @@
"""RAG client configuration management using ContextVars for thread-safe provider switching."""

View File

@@ -0,0 +1,16 @@
"""Base configuration class for RAG providers."""
from dataclasses import field
from typing import Any
from pydantic.dataclasses import dataclass as pyd_dataclass
from crewai.rag.config.optional_imports.types import SupportedProvider
@pyd_dataclass(frozen=True)
class BaseRagConfig:
"""Base class for RAG configuration with Pydantic serialization support."""
provider: SupportedProvider = field(init=False)
embedding_function: Any | None = field(default=None)

View File

@@ -0,0 +1,8 @@
"""Constants for RAG configuration."""
from typing import Final
DISCRIMINATOR: Final[str] = "provider"
DEFAULT_RAG_CONFIG_PATH: Final[str] = "crewai.rag.chromadb.config"
DEFAULT_RAG_CONFIG_CLASS: Final[str] = "ChromaDBConfig"

View File

@@ -0,0 +1 @@
"""Optional imports for RAG configuration providers."""

View File

@@ -0,0 +1,26 @@
"""Base classes for missing provider configurations."""
from typing import Literal
from dataclasses import field
from pydantic import ConfigDict
from pydantic.dataclasses import dataclass as pyd_dataclass
@pyd_dataclass(config=ConfigDict(extra="forbid"))
class _MissingProvider:
"""Base class for missing provider configurations.
Raises RuntimeError when instantiated to indicate missing dependencies.
"""
provider: Literal["chromadb", "qdrant", "__missing__"] = field(
default="__missing__"
)
def __post_init__(self) -> None:
"""Raises error indicating the provider is not installed."""
raise RuntimeError(
f"provider '{self.provider}' requested but not installed. "
f"Install the extra: `uv add crewai'[{self.provider}]'`."
)

Some files were not shown because too many files have changed in this diff Show More