Compare commits

..

68 Commits

Author SHA1 Message Date
Greyson LaLonde
af84ba2272 Merge branch 'main' into gl/fix/cache-handler-types-and-imports 2025-09-05 10:03:41 -04:00
Greyson LaLonde
610c1f70c0 chore: relax mypy configuration and exclude tests from CI (#3452) 2025-09-05 10:00:05 -04:00
Greyson LaLonde
e93d597721 fix: add type annotations to contextual_memory.py 2025-09-05 09:57:02 -04:00
Greyson LaLonde
a414e7f2a7 fix: update test files to use 'limit' instead of 'latest_n' and fix metadata in MemorySaveCompletedEvent 2025-09-05 09:22:21 -04:00
Greyson LaLonde
fbcd8bcd83 fix: update contextual_memory to use 'limit' instead of 'latest_n' 2025-09-05 09:09:33 -04:00
Greyson LaLonde
5f776bbb0a fix: update import to use crewai.llms.base_llm 2025-09-05 08:58:43 -04:00
Greyson LaLonde
909b2fd0ef fix: use create_default_llm when llm is None in BaseEvaluator 2025-09-05 08:57:58 -04:00
Greyson LaLonde
929f9dadb4 fix: remove unnecessary wraps parameter in test patch 2025-09-04 23:15:20 -04:00
Greyson LaLonde
4ef4632a8c fix: update return type annotations in OpenAIAgentAdapter 2025-09-04 23:08:56 -04:00
Greyson LaLonde
c246df3cb2 fix: add type annotations to converter instance variables 2025-09-04 23:02:13 -04:00
Greyson LaLonde
4fd40e7857 fix: add missing super call in LangGraphConverterAdapter 2025-09-04 22:58:18 -04:00
Greyson LaLonde
25204c6cb8 fix: add type annotations to structured output converter 2025-09-04 22:53:32 -04:00
Greyson LaLonde
b44776c367 fix: resolve mypy type errors across agent adapters and core modules 2025-09-04 22:47:18 -04:00
Greyson LaLonde
843801f554 fix: make task required in CrewAgentExecutor and fix all type annotations
- Make task parameter required in CrewAgentExecutor.__init__
- Update Agent.create_agent_executor to require task parameter
- Handle cases where crew can be None (standalone agent usage)
- Update base class signatures to match
- Remove unnecessary create_agent_executor calls during setup
- Add missing type annotations in base_agent_executor_mixin
- Fix all type errors in base_agent.py using Self return type
- Add assert for agent_executor before use
- Fix crew access checks to handle None case
2025-09-04 22:13:46 -04:00
Greyson LaLonde
2faa13ddcb refactor: improve type annotations and simplify code in CrewAgentExecutor 2025-09-04 17:07:02 -04:00
Greyson LaLonde
e385b45667 fix: update cache test assertions for JSON serialization 2025-09-04 16:10:46 -04:00
Greyson LaLonde
f03567d463 Merge branch 'main' into gl/fix/cache-handler-types-and-imports 2025-09-04 16:01:26 -04:00
Greyson LaLonde
e9f4ac070b chore: Relax mypy to not run on tests dir for now 2025-09-04 15:57:17 -04:00
Greyson LaLonde
bcee792390 fix: resolve mypy errors in storage and tracing modules 2025-09-04 15:39:01 -04:00
Greyson LaLonde
ab82da02f9 refactor: cleanup crew agent executor (#3440)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
refactor: cleanup crew agent executor & add docs

- Remove dead code, unused imports, and obsolete methods
- Modernize with updated type hints and static _format_prompt
- Add docstrings for clarity
2025-09-04 15:32:47 -04:00
Greyson LaLonde
221bfcccce refactor: consolidate ChromaDB response extraction logic 2025-09-04 15:21:48 -04:00
Greyson LaLonde
4812986f58 fix: resolve mypy type annotation issues in storage and telemetry modules
- Add proper type parameters for EmbeddingFunction generics
- Fix ChromaDB query response handling with proper type checking
- Add missing return type annotations to telemetry methods
- Fix trace listener type annotations and imports
- Handle potential None values in nested list indexing
- Improve type safety in RAG and knowledge storage modules
2025-09-04 14:58:28 -04:00
Greyson LaLonde
23c60befd8 fix: resolve additional mypy type annotation issues
- Fixed rag_storage.py embedder type compatibility and query response handling
- Fixed knowledge_storage.py dict type parameters and return types
- Added comprehensive type annotations to telemetry.py methods
- Added type annotations to trace_listener.py event handlers and methods
- Fixed ChromaDB response indexing safety checks
2025-09-04 13:23:57 -04:00
Greyson LaLonde
8dd3493e9c fix: resolve additional mypy type annotation issues
- Fixed file_handler.py PickleHandler type annotations
- Fixed task_events.py None checks before accessing task.fingerprint
- Added type annotations to memory_listener.py event handlers
2025-09-04 13:07:37 -04:00
Greyson LaLonde
9306d889a7 fix: resolve remaining mypy type annotation issues
- Applied proper decorator typing with ParamSpec and typing_extensions.Self
- Fixed event bus decorator to preserve type information
- Added type annotations to BaseEventListener and TraceCollectionListener
- Fixed LongTermMemory.search to handle None return from storage.load
- Resolved all type errors tracked in strict mode
2025-09-04 13:00:11 -04:00
Greyson LaLonde
8354cdf061 fix: add missing type annotations to fix mypy strict mode errors
Added type annotations to 10 files to resolve mypy type checking errors:
- Added return type annotations to methods missing them
- Added parameter type annotations where missing
- Fixed Optional type hints to be explicit
- Removed redundant type cast in crew.py
- Changed _execute_with_timeout return type from str to Any in agent.py

Additional type errors remain in other files throughout the codebase.
2025-09-04 11:41:57 -04:00
Greyson LaLonde
2ba48dd82a fix: add type annotations and exclude tests from mypy
- Add type: ignore for mem0 import
- Fix tool_usage.py cache_function None check
- Change _execute_without_timeout return type to Any
- Add type annotations to multiple functions:
  - add_sources() -> None
  - log() with proper parameter types
  - stop_rpm_counter() -> None
  - EventListener.__new__() -> Self
  - setup_listeners() -> None
  - Memory class __init__ methods -> None
  - TaskEvaluator.__init__() -> None
  - get_skipped_task_output() -> TaskOutput
- Exclude tests directory from mypy checks in pyproject.toml
- Update deprecated typing imports to use built-in types
2025-09-04 11:11:59 -04:00
Greyson LaLonde
0bab041531 fix: resolve remaining mypy type errors
- Fix tool_usage.py: rename result variable to avoid redefinition
- Fix lite_agent.py: import TaskOutput from correct module and add type casts
- Add explicit type annotation for data dict in tool_usage.py
2025-09-04 10:40:33 -04:00
Greyson LaLonde
eed2ffde5f fix: resolve additional mypy type errors
- Fix tool_usage.py: proper type annotations for result and fingerprint metadata
- Fix lite_agent.py: proper Union type for guardrail callable accepting both LiteAgentOutput and TaskOutput
- Add missing return type annotations to task_output_storage_handler.py methods
- Fix crew.py: replace Json generic check with str, remove unused type:ignore and redundant cast
2025-09-03 23:23:36 -04:00
Greyson LaLonde
b6e7311d2d fix: update cache tests to use input_data parameter name
The CacheHandler methods use 'input_data' not 'input' as the parameter name
2025-09-03 23:09:51 -04:00
Greyson LaLonde
90ca02b9dc fix: address mypy type errors in multiple files
- Fix return type and argument handling in cache_tools.py
- Add missing return statements in agent.py
- Fix _inject_date_to_task signature to accept Task object
- Remove unused type:ignore comments in tool_usage.py
- Add type annotations to internal methods in mem0_storage.py
2025-09-03 23:05:07 -04:00
Greyson LaLonde
06d5c3f170 fix: update remaining deprecated type annotations in tests 2025-09-03 22:40:05 -04:00
Greyson LaLonde
b94fbd3d3a fix: improve type annotations across codebase 2025-09-03 22:29:41 -04:00
Greyson LaLonde
43880b49a6 Merge branch 'main' into gl/fix/cache-handler-types-and-imports 2025-09-03 21:15:07 -04:00
Lorenze Jay
f0def350a4 chore: update crewAI and tools dependencies to latest versions (#3444)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Updated `crewai-tools` dependency from version 0.65.0 to 0.69.0 in `pyproject.toml` and `uv.lock`.
- Bumped crewAI version from 0.175.0 to 0.177.0 in `__init__.py`.
- Updated dependency versions in CLI templates for crew, flow, and tool projects to reflect the new crewAI version.
2025-09-03 17:27:05 -07:00
Lorenze Jay
f4f32b5f7f fix: suppress Pydantic deprecation warnings in initialization (#3443)
* fix: suppress Pydantic deprecation warnings in initialization

- Implemented a function to filter out Pydantic deprecation warnings, enhancing the user experience by preventing unnecessary warning messages during execution.
- Removed the previous warning filter setup to streamline the warning suppression process.
- Updated the User-Agent header formatting for consistency.

* fix type check

* dropped

* fix: update type-checker workflow and suppress warnings

- Updated the Python version matrix in the type-checker workflow to use double quotes for consistency.
- Added the `# type: ignore[assignment]` comment to the warning suppression assignment in `__init__.py` to address type checking issues.
- Ensured that the mypy command in the workflow allows for untyped calls and generics, enhancing type checking flexibility.

* better
2025-09-03 16:36:50 -07:00
Greyson LaLonde
bdfc38ba32 refactor: update CacheHandler imports to use direct path
- Update imports from crewai.agents.cache to crewai.agents.cache.cache_handler
- Remove CacheHandler from agents module __all__ export
2025-09-03 18:18:05 -04:00
Greyson LaLonde
94029017c3 refactor: remove __all__ from internal cache module
- Remove __all__ export as this is an internal module
- Add module docstring describing package purpose
2025-09-03 18:17:19 -04:00
Greyson LaLonde
89df777887 refactor: use absolute imports in parser module
- Import I18N directly from utilities.i18n
2025-09-03 18:16:03 -04:00
Greyson LaLonde
d1fbf24d9e fix: add type annotations to CacheHandler methods
- Replace Optional with union syntax
- Rename input parameter to input_data to avoid shadowing
- Add JSON serialization for dict cache keys
- Add thread-safety TODO note
2025-09-03 18:15:46 -04:00
Tony Kipkemboi
49a5ae0e16 Docs/release 0.175.0 docs (#3441)
* docs(install): note OpenAI SDK requirement openai>=1.13.3 for 0.175.0

* docs(cli): document device-code login and config reset guidance; renumber sections

* docs(flows): document conditional @start and resumable execution semantics

* docs(tasks): move max_retries to deprecation note under attributes table

* docs: provider-neutral RAG client config; entity memory batching; trigger payload note; tracing batch manager

* docs(cli): fix duplicate numbering (renumber Login/API Keys/Configuration sections)
2025-09-03 17:27:11 -04:00
Lucas Gomide
d31ffdbb90 docs: update Enterprise Action Auth Token section docs (#3437)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-09-02 17:36:28 -04:00
Greyson LaLonde
4555ada91e fix(ruff): remove Python 3.12+ only rules for compatibility (#3436) 2025-09-02 14:15:25 -04:00
Greyson LaLonde
92d71f7f06 chore: migrate CI workflows to uv and update dev tooling (#3426)
chore(dev): update tooling & CI workflows

- Upgrade ruff, mypy (strict), pre-commit; add hooks, stubs, config consolidation
- Add bandit to dev deps and update uv.lock
- Enhance ruff rules (modern Python style, B006 for mutable defaults)
- Update workflows to use uv, matrix strategy, and changed-file type checking
- Include tests in type checking; fix job names and add summary job for branch protection
2025-09-02 12:35:02 -04:00
ZhangYier
dada9f140f fix: README.md example link 404 (#3432)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-09-02 10:29:40 -04:00
Greyson LaLonde
878c1a649a refactor: Move events module to crewai.events (#3425)
refactor(events): relocate events module & update imports

- Move events from utilities/ to top-level events/ with types/, listeners/, utils/ structure
- Update all source/tests/docs to new import paths
- Add backwards compatibility stubs in crewai.utilities.events with deprecation warnings
- Restore test mocks and fix related test imports
2025-09-02 10:06:42 -04:00
Greyson LaLonde
1b1a8fdbf4 fix: replace mutable default arguments with None (#3429)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-31 18:57:45 -04:00
Lorenze Jay
2633b33afc fix: enhance LLM event handling with task and agent metadata (#3422)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* fix: enhance LLM event handling with task and agent metadata

- Added `from_task` and `from_agent` parameters to LLM event emissions for improved traceability.
- Updated `_send_events_to_backend` method in TraceBatchManager to return status codes for better error handling.
- Modified `CREWAI_BASE_URL` to remove trailing slash for consistency.
- Improved logging and graceful failure handling in event sending process.

* drop print
2025-08-29 13:48:49 -07:00
Greyson LaLonde
e4c4b81e63 chore: refactor parser & constants, improve tools_handler, update tests
- Move parser constants to dedicated module with pre-compiled regex
- Refactor CrewAgentParser to module functions; remove unused params
- Improve tools_handler with instance attributes
- Update tests to use module-level parser functions
2025-08-29 14:35:08 -04:00
Greyson LaLonde
ec1eff02a8 fix: achieve parity between rag package and current impl (#3418)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Sanitize ChromaDB collection names and use original dir naming
- Add persistent client with file locking to the ChromaDB factory
- Add upsert support to the ChromaDB client
- Suppress ChromaDB deprecation warnings for `model_fields`
- Extract `suppress_logging` into shared `logger_utils`
- Update tests to reflect upsert behavior
- Docs: add additional note
2025-08-28 11:22:36 -04:00
Lorenze Jay
0f1b764c3e chore: update crewAI version and dependencies to 0.175.0 and tools to 0.65.0 (#3417)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Bump crewAI version from 0.165.1 to 0.175.0 in __init__.py.
* Update tools dependency from 0.62.1 to 0.65.0 in pyproject.toml and uv.lock files.
* Reflect changes in CLI templates for crew, flow, and tool configurations.
2025-08-27 19:33:32 -07:00
Lorenze Jay
6ee9db1d4a fix: enhance PlusAPI and TraceBatchManager with timeout handling and graceful failure logging (#3416)
* Added timeout parameters to PlusAPI trace event methods for improved reliability.
* Updated TraceBatchManager to handle None responses gracefully, logging warnings instead of errors.
* Improved logging messages to provide clearer context during trace batch initialization and event sending failures.
2025-08-27 18:43:03 -07:00
Greyson LaLonde
109de91d08 fix: batch entity memory items to reduce redundant operations (#3409)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* fix: batch save entity memory items to reduce redundant operations

* test: update memory event count after entity batch save implementation
2025-08-27 10:47:20 -04:00
Erika Shorten
92b70e652d Add hybrid search alpha parameter to the docs (#3397)
Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-08-27 10:36:39 -04:00
Heitor Carvalho
fc3f2c49d2 chore: remove auth0 and the need of typing the email on 'crewai login' (#3408)
* Remove the need of typing the email on 'crewai login'

* Remove auth0 constants, update tests
2025-08-27 10:12:57 -04:00
Lucas Gomide
88d2968fd5 chore: add deprecation notices to Task.max_retries (#3379)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-26 17:24:58 -04:00
Lorenze Jay
7addda9398 Lorenze/better tracing events (#3382)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* feat: implement tool usage limit exception handling

- Introduced `ToolUsageLimitExceeded` exception to manage maximum usage limits for tools.
- Enhanced `CrewStructuredTool` to check and raise this exception when the usage limit is reached.
- Updated `_run` and `_execute` methods to include usage limit checks and handle exceptions appropriately, improving reliability and user feedback.

* feat: enhance PlusAPI and ToolUsage with task metadata

- Removed the `send_trace_batch` method from PlusAPI to streamline the API.
- Added timeout parameters to trace event methods in PlusAPI for improved reliability.
- Updated ToolUsage to include task metadata (task name and ID) in event emissions, enhancing traceability and context during tool usage.
- Refactored event handling in LLM and ToolUsage events to ensure task information is consistently captured.

* feat: enhance memory and event handling with task and agent metadata

- Added task and agent metadata to various memory and event classes, improving traceability and context during memory operations.
- Updated the `ContextualMemory` and `Memory` classes to associate tasks and agents, allowing for better context management.
- Enhanced event emissions in `LLM`, `ToolUsage`, and memory events to include task and agent information, facilitating improved debugging and monitoring.
- Refactored event handling to ensure consistent capture of task and agent details across the system.

* drop

* refactor: clean up unused imports in memory and event modules

- Removed unused TYPE_CHECKING imports from long_term_memory.py to streamline the code.
- Eliminated unnecessary import from memory_events.py, enhancing clarity and maintainability.

* fix memory tests

* fix task_completed payload

* fix: remove unused test agent variable in external memory tests

* refactor: remove unused agent parameter from Memory class save method

- Eliminated the agent parameter from the save method in the Memory class to streamline the code and improve clarity.
- Updated the TraceBatchManager class by moving initialization of attributes into the constructor for better organization and readability.

* refactor: enhance ExecutionState and ReasoningEvent classes with optional task and agent identifiers

- Added optional `current_agent_id` and `current_task_id` attributes to the `ExecutionState` class for better tracking of agent and task states.
- Updated the `from_task` attribute in the `ReasoningEvent` class to use `Optional[Any]` instead of a specific type, improving flexibility in event handling.

* refactor: update ExecutionState class by removing unused agent and task identifiers

- Removed the `current_agent_id` and `current_task_id` attributes from the `ExecutionState` class to simplify the code and enhance clarity.
- Adjusted the import statements to include `Optional` for better type handling.

* refactor: streamline LLM event handling in LiteAgent

- Removed unused LLM event emissions (LLMCallStartedEvent, LLMCallCompletedEvent, LLMCallFailedEvent) from the LiteAgent class to simplify the code and improve performance.
- Adjusted the flow of LLM response handling by eliminating unnecessary event bus interactions, enhancing clarity and maintainability.

* flow ownership and not emitting events when a crew is done

* refactor: remove unused agent parameter from ShortTermMemory save method

- Eliminated the agent parameter from the save method in the ShortTermMemory class to streamline the code and improve clarity.
- This change enhances the maintainability of the memory management system by reducing unnecessary complexity.

* runtype check fix

* fixing tests

* fix lints

* fix: update event assertions in test_llm_emits_event_with_lite_agent

- Adjusted the expected counts for completed and started events in the test to reflect the correct behavior of the LiteAgent.
- Updated assertions for agent roles and IDs to match the expected values after recent changes in event handling.

* fix: update task name assertions in event tests

- Modified assertions in `test_stream_llm_emits_event_with_task_and_agent_info` and `test_llm_emits_event_with_task_and_agent_info` to use `task.description` as a fallback for `task.name`. This ensures that the tests correctly validate the task name even when it is not explicitly set.

* fix: update test assertions for output values and improve readability

- Updated assertions in `test_output_json_dict_hierarchical` to reflect the correct expected score value.
- Enhanced readability of assertions in `test_output_pydantic_to_another_task` and `test_key` by formatting the error messages for clarity.
- These changes ensure that the tests accurately validate the expected outputs and improve overall code quality.

* test fixes

* fix crew_test

* added another fixture

* fix: ensure agent and task assignments in contextual memory are conditional

- Updated the ContextualMemory class to check for the existence of short-term, long-term, external, and extended memory before assigning agent and task attributes. This prevents potential attribute errors when memory types are not initialized.
2025-08-26 09:09:46 -07:00
Greyson LaLonde
4b4a119a9f refactor: simplify rag client initialization (#3401)
* Simplified Qdrant and ChromaDB client initialization
* Refactored factory structure and updated tests accordingly
2025-08-26 08:54:51 -04:00
Greyson LaLonde
869bb115c8 Qdrant RAG Provider Support (#3400)
* Added Qdrant provider support with factory, config, and protocols
* Improved default embeddings and type definitions
* Fixed ChromaDB factory embedding assignment
2025-08-26 08:44:02 -04:00
Greyson LaLonde
7ac482c7c9 feat: rag configuration with optional dependency support (#3394)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
### RAG Config System

* Added ChromaDB client creation via config with sensible defaults
* Introduced optional imports and shared RAG config utilities/schema
* Enabled embedding function support with ChromaDB provider integration
* Refactored configs for immutability and stronger type safety
* Removed unused code and expanded test coverage
2025-08-26 00:00:22 -04:00
Greyson LaLonde
2e4bd3f49d feat: qdrant generic client (#3377)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
### Qdrant Client

* Add core client with collection, search, and document APIs (sync + async)
* Refactor utilities, types, and vector params (default 384-dim)
* Improve error handling with `ClientMethodMismatchError`
* Add score normalization, async embeddings, and optional `qdrant-client` dep
* Expand tests and type safety throughout
2025-08-25 16:02:25 -04:00
Greyson LaLonde
c02997d956 Add import utilities for optional dependencies (#3389)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-24 22:57:44 -04:00
Heitor Carvalho
f96b779df5 feat: reset tokens on crewai config reset (#3365)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-22 16:16:42 -04:00
Greyson LaLonde
842bed4e9c feat: chromadb generic client (#3374)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Add ChromaDB client implementation with async support

- Implement core collection operations (create, get_or_create, delete)
- Add search functionality with cosine similarity scoring
- Include both sync and async method variants
- Add type safety with NamedTuples and TypeGuards
- Extract utility functions to separate modules
- Default to cosine distance metric for text similarity
- Add comprehensive test coverage

TODO:
- l2, ip score calculations are not settled on
2025-08-21 18:18:46 -04:00
Lucas Gomide
1217935b31 feat: add docs about Automation triggers (#3375)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-20 22:02:47 -04:00
Greyson LaLonde
641c156c17 fix: address flaky tests (#3363)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
fix: resolve flaky tests and race conditions in test suite

- Fix telemetry/event tests by patching class methods instead of instances
- Use unique temp files/directories to prevent CI race conditions
- Reset singleton state between tests
- Mock embedchain.Client.setup() to prevent JSON corruption
- Rename test files to test_*.py convention
- Move agent tests to tests/agents directory
- Fix repeated tool usage detection
- Remove database-dependent tools causing initialization errors
2025-08-20 13:34:09 -04:00
Tony Kipkemboi
7fdf9f9290 docs: fix API Reference OpenAPI sources and redirects (#3368)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* docs: fix API Reference OpenAPI sources and redirects; clarify training data usage; add Mermaid diagram; correct CLI usage and notes

* docs(mintlify): use explicit openapi {source, directory} with absolute paths to fix branch deployment routing

* docs(mintlify): add explicit endpoint MDX pages and include in nav; keep OpenAPI auto-gen as fallback

* docs(mintlify): remove OpenAPI Endpoints groups; add localized MDX endpoint pages for pt-BR and ko
2025-08-20 11:55:35 -04:00
Greyson LaLonde
c0d2bf4c12 fix: flow listener resumability for HITL and cyclic flows (#3322)
* fix: flow listener resumability for HITL and cyclic flows

- Add resumption context flag to distinguish HITL resumption from cyclic execution
- Skip method re-execution only during HITL resumption, not for cyclic flows
- Ensure cyclic flows like test_cyclic_flow continue to work correctly

* fix: prevent duplicate execution of conditional start methods in flows

* fix: resolve type error in flow.py line 1040 assignment
2025-08-20 10:06:18 -04:00
231 changed files with 14589 additions and 3766 deletions

View File

@@ -15,8 +15,19 @@ jobs:
- name: Fetch Target Branch
run: git fetch origin $TARGET_BRANCH --depth=1
- name: Install Ruff
run: pip install ruff
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true
cache-dependency-glob: |
**/pyproject.toml
**/uv.lock
- name: Set up Python
run: uv python install 3.11
- name: Install dependencies
run: uv sync --dev --no-install-project
- name: Get Changed Python Files
id: changed-files
@@ -33,4 +44,4 @@ jobs:
echo "${{ steps.changed-files.outputs.files }}" \
| tr ' ' '\n' \
| grep -v 'src/crewai/cli/templates/' \
| xargs -I{} ruff check "{}"
| xargs -I{} uv run ruff check "{}"

View File

@@ -10,14 +10,20 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
python-version: "3.11.9"
enable-cache: true
cache-dependency-glob: |
**/pyproject.toml
**/uv.lock
- name: Set up Python
run: uv python install 3.11
- name: Install dependencies
run: pip install bandit
run: uv sync --dev --no-install-project
- name: Run Bandit
run: bandit -c pyproject.toml -r src/ -ll
run: uv run bandit -c pyproject.toml -r src/ -ll

View File

@@ -24,7 +24,7 @@ jobs:
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v3
uses: astral-sh/setup-uv@v6
with:
enable-cache: true
cache-dependency-glob: |

View File

@@ -6,21 +6,78 @@ permissions:
contents: write
jobs:
type-checker:
type-checker-matrix:
name: type-checker (${{ matrix.python-version }})
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.11.9"
fetch-depth: 0 # Fetch all history for proper diff
- name: Install Requirements
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true
cache-dependency-glob: |
**/pyproject.toml
**/uv.lock
- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}
- name: Install dependencies
run: uv sync --dev --no-install-project
- name: Get changed Python files
id: changed-files
run: |
pip install mypy
# Get the list of changed Python files compared to the base branch
echo "Fetching changed files..."
git diff --name-only --diff-filter=ACMRT origin/${{ github.base_ref }}...HEAD -- '*.py' > changed_files.txt
- name: Run type checks
run: mypy src
# Filter for files in src/ directory only (excluding tests/)
grep -E "^src/" changed_files.txt > filtered_changed_files.txt || true
# Check if there are any changed files
if [ -s filtered_changed_files.txt ]; then
echo "Changed Python files in src/:"
cat filtered_changed_files.txt
echo "has_changes=true" >> $GITHUB_OUTPUT
# Convert newlines to spaces for mypy command
echo "files=$(cat filtered_changed_files.txt | tr '\n' ' ')" >> $GITHUB_OUTPUT
else
echo "No Python files changed in src/"
echo "has_changes=false" >> $GITHUB_OUTPUT
fi
- name: Run type checks on changed files
if: steps.changed-files.outputs.has_changes == 'true'
run: |
echo "Running mypy on changed files with Python ${{ matrix.python-version }}..."
uv run mypy ${{ steps.changed-files.outputs.files }}
- name: No files to check
if: steps.changed-files.outputs.has_changes == 'false'
run: echo "No Python files in src/ were modified - skipping type checks"
# Summary job to provide single status for branch protection
type-checker:
name: type-checker
runs-on: ubuntu-latest
needs: type-checker-matrix
if: always()
steps:
- name: Check matrix results
run: |
if [ "${{ needs.type-checker-matrix.result }}" == "success" ] || [ "${{ needs.type-checker-matrix.result }}" == "skipped" ]; then
echo "✅ All type checks passed"
else
echo "❌ Type checks failed"
exit 1
fi

View File

@@ -1,7 +1,16 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.2
rev: v0.12.11
hooks:
- id: ruff
args: ["--fix"]
args: ["--config", "pyproject.toml"]
- id: ruff-format
args: ["--config", "pyproject.toml"]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.17.1
hooks:
- id: mypy
args: ["--strict", "--exclude", "src/crewai/cli/templates"]
files: ^src/
exclude: ^tests/

View File

@@ -1,4 +0,0 @@
exclude = [
"templates",
"__init__.py",
]

View File

@@ -418,10 +418,10 @@ Choose CrewAI to easily build powerful, adaptable, and production-ready AI autom
You can test different real life examples of AI crews in the [CrewAI-examples repo](https://github.com/crewAIInc/crewAI-examples?tab=readme-ov-file):
- [Landing Page Generator](https://github.com/crewAIInc/crewAI-examples/tree/main/landing_page_generator)
- [Landing Page Generator](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/landing_page_generator)
- [Having Human input on the execution](https://docs.crewai.com/how-to/Human-Input-on-Execution)
- [Trip Planner](https://github.com/crewAIInc/crewAI-examples/tree/main/trip_planner)
- [Stock Analysis](https://github.com/crewAIInc/crewAI-examples/tree/main/stock_analysis)
- [Trip Planner](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/trip_planner)
- [Stock Analysis](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/stock_analysis)
### Quick Tutorial
@@ -429,19 +429,19 @@ You can test different real life examples of AI crews in the [CrewAI-examples re
### Write Job Descriptions
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/job-posting) or watch a video below:
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/job-posting) or watch a video below:
[![Jobs postings](https://img.youtube.com/vi/u98wEMz-9to/maxresdefault.jpg)](https://www.youtube.com/watch?v=u98wEMz-9to "Jobs postings")
### Trip Planner
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/trip_planner) or watch a video below:
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/trip_planner) or watch a video below:
[![Trip Planner](https://img.youtube.com/vi/xis7rWp-hjs/maxresdefault.jpg)](https://www.youtube.com/watch?v=xis7rWp-hjs "Trip Planner")
### Stock Analysis
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/stock_analysis) or watch a video below:
[Check out code for this example](https://github.com/crewAIInc/crewAI-examples/tree/main/crews/stock_analysis) or watch a video below:
[![Stock Analysis](https://img.youtube.com/vi/e0Uj4yWdaAg/maxresdefault.jpg)](https://www.youtube.com/watch?v=e0Uj4yWdaAg "Stock Analysis")

View File

@@ -320,6 +320,7 @@
"en/enterprise/guides/update-crew",
"en/enterprise/guides/enable-crew-studio",
"en/enterprise/guides/azure-openai-setup",
"en/enterprise/guides/automation-triggers",
"en/enterprise/guides/hubspot-trigger",
"en/enterprise/guides/react-component-export",
"en/enterprise/guides/salesforce-trigger",
@@ -658,6 +659,7 @@
"pt-BR/enterprise/guides/update-crew",
"pt-BR/enterprise/guides/enable-crew-studio",
"pt-BR/enterprise/guides/azure-openai-setup",
"pt-BR/enterprise/guides/automation-triggers",
"pt-BR/enterprise/guides/hubspot-trigger",
"pt-BR/enterprise/guides/react-component-export",
"pt-BR/enterprise/guides/salesforce-trigger",
@@ -1007,6 +1009,7 @@
"ko/enterprise/guides/update-crew",
"ko/enterprise/guides/enable-crew-studio",
"ko/enterprise/guides/azure-openai-setup",
"ko/enterprise/guides/automation-triggers",
"ko/enterprise/guides/hubspot-trigger",
"ko/enterprise/guides/react-component-export",
"ko/enterprise/guides/salesforce-trigger",

View File

@@ -282,7 +282,25 @@ Watch this video tutorial for a step-by-step demonstration of deploying your cre
allowfullscreen
></iframe>
### 11. API Keys
### 12. Login
Authenticate with CrewAI Enterprise using a secure device code flow (no email entry required).
```shell Terminal
crewai login
```
What happens:
- A verification URL and short code are displayed in your terminal
- Your browser opens to the verification URL
- Enter/confirm the code to complete authentication
Notes:
- The OAuth2 provider and domain are configured via `crewai config` (defaults use `login.crewai.com`)
- After successful login, the CLI also attempts to authenticate to the Tool Repository automatically
- If you reset your configuration, run `crewai login` again to re-authenticate
### 13. API Keys
When running ```crewai create crew``` command, the CLI will show you a list of available LLM providers to choose from, followed by model selection for your chosen provider.
@@ -310,7 +328,7 @@ See the following link for each provider's key name:
* [LiteLLM Providers](https://docs.litellm.ai/docs/providers)
### 12. Configuration Management
### 14. Configuration Management
Manage CLI configuration settings for CrewAI.
@@ -385,6 +403,10 @@ Reset all configuration to defaults:
crewai config reset
```
<Tip>
After resetting configuration, re-run `crewai login` to authenticate again.
</Tip>
<Note>
Configuration settings are stored in `~/.config/crewai/settings.json`. Some settings like organization name and UUID are read-only and managed through authentication and organization commands. Tool repository related settings are hidden and cannot be set directly by users.
</Note>

View File

@@ -44,12 +44,12 @@ To create a custom event listener, you need to:
Here's a simple example of a custom event listener class:
```python
from crewai.utilities.events import (
from crewai.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def __init__(self):
@@ -146,7 +146,7 @@ my_project/
```python
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
# ... import events ...
class MyCustomListener(BaseEventListener):
@@ -279,7 +279,7 @@ Additional fields vary by event type. For example, `CrewKickoffCompletedEvent` i
For temporary event handling (useful for testing or specific operations), you can use the `scoped_handlers` context manager:
```python
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
from crewai.events import crewai_event_bus, CrewKickoffStartedEvent
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)

View File

@@ -97,7 +97,13 @@ The state's unique ID and stored data can be useful for tracking flow executions
### @start()
The `@start()` decorator is used to mark a method as the starting point of a Flow. When a Flow is started, all the methods decorated with `@start()` are executed in parallel. You can have multiple start methods in a Flow, and they will all be executed when the Flow is started.
The `@start()` decorator marks entry points for a Flow. You can:
- Declare multiple unconditional starts: `@start()`
- Gate a start on a prior method or router label: `@start("method_or_label")`
- Provide a callable condition to control when a start should fire
All satisfied `@start()` methods will execute (often in parallel) when the Flow begins or resumes.
### @listen()

View File

@@ -24,6 +24,41 @@ For file-based Knowledge Sources, make sure to place your files in a `knowledge`
Also, use relative paths from the `knowledge` directory when creating the source.
</Tip>
### Vector store (RAG) client configuration
CrewAI exposes a provider-neutral RAG client abstraction for vector stores. The default provider is ChromaDB, and Qdrant is supported as well. You can switch providers using configuration utilities.
Supported today:
- ChromaDB (default)
- Qdrant
```python Code
from crewai.rag.config.utils import set_rag_config, get_rag_client, clear_rag_config
# ChromaDB (default)
from crewai.rag.chromadb.config import ChromaDBConfig
set_rag_config(ChromaDBConfig())
chromadb_client = get_rag_client()
# Qdrant
from crewai.rag.qdrant.config import QdrantConfig
set_rag_config(QdrantConfig())
qdrant_client = get_rag_client()
# Example operations (same API for any provider)
client = qdrant_client # or chromadb_client
client.create_collection(collection_name="docs")
client.add_documents(
collection_name="docs",
documents=[{"id": "1", "content": "CrewAI enables collaborative AI agents."}],
)
results = client.search(collection_name="docs", query="collaborative agents", limit=3)
clear_rag_config() # optional reset
```
This RAG client is separate from Knowledges built-in storage. Use it when you need direct vector-store control or custom retrieval pipelines.
### Basic String Knowledge Example
```python Code
@@ -681,11 +716,11 @@ CrewAI emits events during the knowledge retrieval process that you can listen f
#### Example: Monitoring Knowledge Retrieval
```python
from crewai.utilities.events import (
from crewai.events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
BaseEventListener,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class KnowledgeMonitorListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -733,10 +733,10 @@ CrewAI supports streaming responses from LLMs, allowing your application to rece
CrewAI emits events for each chunk received during streaming:
```python
from crewai.utilities.events import (
from crewai.events import (
LLMStreamChunkEvent
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
@@ -758,8 +758,8 @@ CrewAI supports streaming responses from LLMs, allowing your application to rece
```python
from crewai import LLM, Agent, Task, Crew
from crewai.utilities.events import LLMStreamChunkEvent
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import LLMStreamChunkEvent
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -738,6 +738,17 @@ print(f"OpenAI: {openai_time:.2f}s")
print(f"Ollama: {ollama_time:.2f}s")
```
### Entity Memory batching behavior
Entity Memory supports batching when saving multiple entities at once. When you pass a list of `EntityMemoryItem`, the system:
- Emits a single MemorySaveStartedEvent with `entity_count`
- Saves each entity internally, collecting any partial errors
- Emits MemorySaveCompletedEvent with aggregate metadata (saved count, errors)
- Raises a partial-save exception if some entities failed (includes counts)
This improves performance and observability when writing many entities in one operation.
## 2. External Memory
External Memory provides a standalone memory system that operates independently from the crew's built-in memory. This is ideal for specialized memory providers or cross-application memory sharing.
@@ -1041,8 +1052,8 @@ CrewAI emits the following memory-related events:
Track memory operation timing to optimize your application:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent
)
@@ -1076,8 +1087,8 @@ memory_monitor = MemoryPerformanceMonitor()
Log memory operations for debugging and insights:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemorySaveStartedEvent,
MemoryQueryStartedEvent,
MemoryRetrievalCompletedEvent
@@ -1117,8 +1128,8 @@ memory_logger = MemoryLogger()
Capture and respond to memory errors:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemorySaveFailedEvent,
MemoryQueryFailedEvent
)
@@ -1167,8 +1178,8 @@ error_tracker = MemoryErrorTracker(notify_email="admin@example.com")
Memory events can be forwarded to analytics and monitoring platforms to track performance metrics, detect anomalies, and visualize memory usage patterns:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent
)

View File

@@ -59,6 +59,12 @@ crew = Crew(
| **Output Pydantic** _(optional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | A Pydantic model for task output. |
| **Callback** _(optional)_ | `callback` | `Optional[Any]` | Function/object to be executed after task completion. |
| **Guardrail** _(optional)_ | `guardrail` | `Optional[Callable]` | Function to validate task output before proceeding to next task. |
| **Guardrail Max Retries** _(optional)_ | `guardrail_max_retries` | `Optional[int]` | Maximum number of retries when guardrail validation fails. Defaults to 3. |
<Note type="warning" title="Deprecated: max_retries">
The task attribute `max_retries` is deprecated and will be removed in v1.0.0.
Use `guardrail_max_retries` instead to control retry attempts when a guardrail fails.
</Note>
## Creating Tasks
@@ -431,7 +437,7 @@ When a guardrail returns `(False, error)`:
2. The agent attempts to fix the issue
3. The process repeats until:
- The guardrail returns `(True, result)`
- Maximum retries are reached
- Maximum retries are reached (`guardrail_max_retries`)
Example with retry handling:
```python Code
@@ -452,7 +458,7 @@ task = Task(
expected_output="A valid JSON object",
agent=analyst,
guardrail=validate_json_output,
max_retries=3 # Limit retry attempts
guardrail_max_retries=3 # Limit retry attempts
)
```

View File

@@ -59,7 +59,7 @@ Before using Authentication Integrations, ensure you have:
3. Click **Connect** on your desired service from the Authentication Integrations section
4. Complete the OAuth authentication flow
5. Grant necessary permissions for your use case
6. Get your Enterprise Token from your [CrewAI Enterprise](https://app.crewai.com) account page - https://app.crewai.com/crewai_plus/settings/account
6. All set! Get your Enterprise Token from your [CrewAI Enterprise](https://app.crewai.com) in **Integration** tab
<Frame>
![Integrations](/images/enterprise/enterprise_action_auth_token.png)

View File

@@ -141,6 +141,16 @@ Traces are invaluable for troubleshooting issues with your crews:
</Step>
</Steps>
## Performance and batching
CrewAI batches trace uploads to reduce overhead on high-volume runs:
- A TraceBatchManager buffers events and sends them in batches via the Plus API client
- Reduces network chatter and improves reliability on flaky connections
- Automatically enabled in the default trace listener; no configuration needed
This yields more stable tracing under load while preserving detailed task/agent telemetry.
<Card title="Need Help?" icon="headset" href="mailto:support@crewai.com">
Contact our support team for assistance with trace analysis or any other CrewAI Enterprise features.
</Card>

View File

@@ -0,0 +1,178 @@
---
title: "Automation Triggers"
description: "Automatically execute your CrewAI workflows when specific events occur in connected integrations"
icon: "bolt"
---
Automation triggers enable you to automatically run your CrewAI deployments when specific events occur in your connected integrations, creating powerful event-driven workflows that respond to real-time changes in your business systems.
## Overview
With automation triggers, you can:
- **Respond to real-time events** - Automatically execute workflows when specific conditions are met
- **Integrate with external systems** - Connect with platforms like Gmail, Outlook, OneDrive, JIRA, Slack, Stripe and more
- **Scale your automation** - Handle high-volume events without manual intervention
- **Maintain context** - Access trigger data within your crews and flows
## Managing Automation Triggers
### Viewing Available Triggers
To access and manage your automation triggers:
1. Navigate to your deployment in the CrewAI dashboard
2. Click on the **Triggers** tab to view all available trigger integrations
<Frame>
<img src="/images/enterprise/list-available-triggers.png" alt="List of available automation triggers" />
</Frame>
This view shows all the trigger integrations available for your deployment, along with their current connection status.
### Enabling and Disabling Triggers
Each trigger can be easily enabled or disabled using the toggle switch:
<Frame>
<img src="/images/enterprise/trigger-selected.png" alt="Enable or disable triggers with toggle" />
</Frame>
- **Enabled (blue toggle)**: The trigger is active and will automatically execute your deployment when the specified events occur
- **Disabled (gray toggle)**: The trigger is inactive and will not respond to events
Simply click the toggle to change the trigger state. Changes take effect immediately.
### Monitoring Trigger Executions
Track the performance and history of your triggered executions:
<Frame>
<img src="/images/enterprise/list-executions.png" alt="List of executions triggered by automation" />
</Frame>
## Building Automation
Before building your automation, it's helpful to understand the structure of trigger payloads that your crews and flows will receive.
### Payload Samples Repository
We maintain a comprehensive repository with sample payloads from various trigger sources to help you build and test your automations:
**🔗 [CrewAI Enterprise Trigger Payload Samples](https://github.com/crewAIInc/crewai-enterprise-trigger-payload-samples)**
This repository contains:
- **Real payload examples** from different trigger sources (Gmail, Google Drive, etc.)
- **Payload structure documentation** showing the format and available fields
### Triggers with Crew
Your existing crew definitions work seamlessly with triggers, you just need to have a task to parse the received payload:
```python
@CrewBase
class MyAutomatedCrew:
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
)
@task
def parse_trigger_payload(self) -> Task:
return Task(
config=self.tasks_config['parse_trigger_payload'],
agent=self.researcher(),
)
@task
def analyze_trigger_content(self) -> Task:
return Task(
config=self.tasks_config['analyze_trigger_data'],
agent=self.researcher(),
)
```
The crew will automatically receive and can access the trigger payload through the standard CrewAI context mechanisms.
<Note>
Crew and Flow inputs can include `crewai_trigger_payload`. CrewAI automatically injects this payload:
- Tasks: appended to the first task's description by default ("Trigger Payload: {crewai_trigger_payload}")
- Control via `allow_crewai_trigger_context`: set `True` to always inject, `False` to never inject
- Flows: any `@start()` method that accepts a `crewai_trigger_payload` parameter will receive it
</Note>
### Integration with Flows
For flows, you have more control over how trigger data is handled:
#### Accessing Trigger Payload
All `@start()` methods in your flows will accept an additional parameter called `crewai_trigger_payload`:
```python
from crewai.flow import Flow, start, listen
class MyAutomatedFlow(Flow):
@start()
def handle_trigger(self, crewai_trigger_payload: dict = None):
"""
This start method can receive trigger data
"""
if crewai_trigger_payload:
# Process the trigger data
trigger_id = crewai_trigger_payload.get('id')
event_data = crewai_trigger_payload.get('payload', {})
# Store in flow state for use by other methods
self.state.trigger_id = trigger_id
self.state.trigger_type = event_data
return event_data
# Handle manual execution
return None
@listen(handle_trigger)
def process_data(self, trigger_data):
"""
Process the data from the trigger
"""
# ... process the trigger
```
#### Triggering Crews from Flows
When kicking off a crew within a flow that was triggered, pass the trigger payload as it:
```python
@start()
def delegate_to_crew(self, crewai_trigger_payload: dict = None):
"""
Delegate processing to a specialized crew
"""
crew = MySpecializedCrew()
# Pass the trigger payload to the crew
result = crew.crew().kickoff(
inputs={
'a_custom_parameter': "custom_value",
'crewai_trigger_payload': crewai_trigger_payload
},
)
return result
```
## Troubleshooting
**Trigger not firing:**
- Verify the trigger is enabled
- Check integration connection status
**Execution failures:**
- Check the execution logs for error details
- If you are developing, make sure the inputs include the `crewai_trigger_payload` parameter with the correct payload
Automation triggers transform your CrewAI deployments into responsive, event-driven systems that can seamlessly integrate with your existing business processes and tools.

View File

@@ -348,6 +348,31 @@ class SelectivePersistFlow(Flow):
## Advanced State Patterns
### Conditional starts and resumable execution
Flows support conditional `@start()` and resumable execution for HITL/cyclic scenarios:
```python
from crewai.flow.flow import Flow, start, listen, and_, or_
class ResumableFlow(Flow):
@start() # unconditional start
def init(self):
...
# Conditional start: run after "init" or external trigger name
@start("init")
def maybe_begin(self):
...
@listen(and_(init, maybe_begin))
def proceed(self):
...
```
- Conditional `@start()` accepts a method name, a router label, or a callable condition.
- During resume, listeners continue from prior checkpoints; cycle/router branches honor resumption flags.
### State-Based Conditional Logic
You can use state to implement complex conditional logic in your flows:

View File

@@ -30,6 +30,12 @@ Watch this video tutorial for a step-by-step demonstration of the installation p
If you need to update Python, visit [python.org/downloads](https://python.org/downloads)
</Note>
<Note>
**OpenAI SDK Requirement**
CrewAI 0.175.0 requires `openai >= 1.13.3`. If you manage dependencies yourself, ensure your environment satisfies this constraint to avoid import/runtime issues.
</Note>
CrewAI uses the `uv` as its dependency management and package handling tool. It simplifies project setup and execution, offering a seamless experience.
If you haven't installed `uv` yet, follow **step 1** to quickly get it set up on your system, else you can skip to **step 2**.

View File

@@ -1,13 +1,13 @@
---
title: Weaviate Vector Search
description: The `WeaviateVectorSearchTool` is designed to search a Weaviate vector database for semantically similar documents.
description: The `WeaviateVectorSearchTool` is designed to search a Weaviate vector database for semantically similar documents using hybrid search.
icon: network-wired
---
## Overview
The `WeaviateVectorSearchTool` is specifically crafted for conducting semantic searches within documents stored in a Weaviate vector database. This tool allows you to find semantically similar documents to a given query, leveraging the power of vector embeddings for more accurate and contextually relevant search results.
The `WeaviateVectorSearchTool` is specifically crafted for conducting semantic searches within documents stored in a Weaviate vector database. This tool allows you to find semantically similar documents to a given query, leveraging the power of vector and keyword search for more accurate and contextually relevant search results.
[Weaviate](https://weaviate.io/) is a vector database that stores and queries vector embeddings, enabling semantic search capabilities.
@@ -39,6 +39,7 @@ from crewai_tools import WeaviateVectorSearchTool
tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
alpha=0.75,
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
weaviate_api_key="your-weaviate-api-key",
)
@@ -63,6 +64,7 @@ The `WeaviateVectorSearchTool` accepts the following parameters:
- **weaviate_cluster_url**: Required. The URL of the Weaviate cluster.
- **weaviate_api_key**: Required. The API key for the Weaviate cluster.
- **limit**: Optional. The number of results to return. Default is `3`.
- **alpha**: Optional. Controls the weighting between vector and keyword (BM25) search. alpha = 0 -> BM25 only, alpha = 1 -> vector search only. Default is `0.75`.
- **vectorizer**: Optional. The vectorizer to use. If not provided, it will use `text2vec_openai` with the `nomic-embed-text` model.
- **generative_model**: Optional. The generative model to use. If not provided, it will use OpenAI's `gpt-4o`.
@@ -78,6 +80,7 @@ from weaviate.classes.config import Configure
tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
alpha=0.75,
vectorizer=Configure.Vectorizer.text2vec_openai(model="nomic-embed-text"),
generative_model=Configure.Generative.openai(model="gpt-4o-mini"),
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
@@ -128,6 +131,7 @@ with test_docs.batch.dynamic() as batch:
tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
alpha=0.75,
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
weaviate_api_key="your-weaviate-api-key",
)
@@ -145,6 +149,7 @@ from crewai_tools import WeaviateVectorSearchTool
weaviate_tool = WeaviateVectorSearchTool(
collection_name='example_collections',
limit=3,
alpha=0.75,
weaviate_cluster_url="https://your-weaviate-cluster-url.com",
weaviate_api_key="your-weaviate-api-key",
)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 54 KiB

After

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 142 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 330 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 133 KiB

View File

@@ -44,12 +44,12 @@ Prompt Tracing을 통해 다음과 같은 작업이 가능합니다:
아래는 커스텀 이벤트 리스너 클래스의 간단한 예시입니다:
```python
from crewai.utilities.events import (
from crewai.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def __init__(self):
@@ -146,7 +146,7 @@ my_project/
```python
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
# ... import events ...
class MyCustomListener(BaseEventListener):
@@ -279,7 +279,7 @@ CrewAI는 여러분이 청취할 수 있는 다양한 이벤트를 제공합니
임시 이벤트 처리가 필요한 경우(테스트 또는 특정 작업에 유용함), `scoped_handlers` 컨텍스트 관리자를 사용할 수 있습니다:
```python
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
from crewai.events import crewai_event_bus, CrewKickoffStartedEvent
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)

View File

@@ -683,11 +683,11 @@ CrewAI는 knowledge 검색 과정에서 이벤트를 발생시키며, 이벤트
#### 예시: Knowledge Retrieval 모니터링
```python
from crewai.utilities.events import (
from crewai.events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
BaseEventListener,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class KnowledgeMonitorListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -731,10 +731,10 @@ CrewAI는 LLM의 스트리밍 응답을 지원하여, 애플리케이션이 출
CrewAI는 스트리밍 중 수신되는 각 청크에 대해 이벤트를 발생시킵니다:
```python
from crewai.utilities.events import (
from crewai.events import (
LLMStreamChunkEvent
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
@@ -756,8 +756,8 @@ CrewAI는 LLM의 스트리밍 응답을 지원하여, 애플리케이션이 출
```python
from crewai import LLM, Agent, Task, Crew
from crewai.utilities.events import LLMStreamChunkEvent
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import LLMStreamChunkEvent
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -985,8 +985,8 @@ CrewAI는 다음과 같은 메모리 관련 이벤트를 발생시킵니다:
애플리케이션을 최적화하기 위해 메모리 작업 타이밍을 추적하세요:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent
)
@@ -1020,8 +1020,8 @@ memory_monitor = MemoryPerformanceMonitor()
디버깅 및 인사이트를 위해 메모리 작업을 로깅합니다:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemorySaveStartedEvent,
MemoryQueryStartedEvent,
MemoryRetrievalCompletedEvent
@@ -1061,8 +1061,8 @@ memory_logger = MemoryLogger()
메모리 오류를 캡처하고 대응합니다:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemorySaveFailedEvent,
MemoryQueryFailedEvent
)
@@ -1111,8 +1111,8 @@ error_tracker = MemoryErrorTracker(notify_email="admin@example.com")
메모리 이벤트는 분석 및 모니터링 플랫폼으로 전달되어 성능 지표를 추적하고, 이상 징후를 감지하며, 메모리 사용 패턴을 시각화할 수 있습니다:
```python
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events import (
from crewai.events import (
BaseEventListener,
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent
)

View File

@@ -59,6 +59,7 @@ crew = Crew(
| **Pydantic 출력** _(선택 사항)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | 태스크 출력용 Pydantic 모델입니다. |
| **콜백** _(선택 사항)_ | `callback` | `Optional[Any]` | 태스크 완료 후 실행할 함수/객체입니다. |
| **가드레일** _(선택 사항)_ | `guardrail` | `Optional[Callable]` | 다음 태스크로 진행하기 전에 태스크 출력을 검증하는 함수입니다. |
| **가드레일 최대 재시도** _(선택 사항)_ | `guardrail_max_retries` | `Optional[int]` | 가드레일 검증 실패 시 최대 재시도 횟수입니다. 기본값은 3입니다. |
## 작업 생성하기
@@ -448,7 +449,7 @@ task = Task(
expected_output="A valid JSON object",
agent=analyst,
guardrail=validate_json_output,
max_retries=3 # Limit retry attempts
guardrail_max_retries=3 # 재시도 횟수 제한
)
```
@@ -899,4 +900,4 @@ except RuntimeError as e:
작업(task)은 CrewAI 에이전트의 행동을 이끄는 원동력입니다.
작업과 그 결과를 적절하게 정의함으로써, 에이전트가 독립적으로 또는 협업 단위로 효과적으로 작동할 수 있는 기반을 마련할 수 있습니다.
작업에 적합한 도구를 장착하고, 실행 과정을 이해하며, 견고한 검증 절차를 따르는 것은 CrewAI의 잠재력을 극대화하는 데 필수적입니다.
이를 통해 에이전트가 할당된 작업에 효과적으로 준비되고, 작업이 의도대로 수행될 수 있습니다.
이를 통해 에이전트가 할당된 작업에 효과적으로 준비되고, 작업이 의도대로 수행될 수 있습니다.

View File

@@ -58,7 +58,7 @@ Authentication Integrations를 사용하기 전에 다음이 준비되어 있는
3. Authentication Integrations 섹션에서 원하는 서비스의 **Connect** 버튼을 클릭합니다.
4. OAuth 인증 과정을 완료합니다.
5. 사용 사례에 필요한 권한을 부여합니다.
6. [CrewAI Enterprise](https://app.crewai.com) 계정 페이지 - https://app.crewai.com/crewai_plus/settings/account 에서 Enterprise Token을 받습니다.
6. 완료! [CrewAI Enterprise](https://app.crewai.com)의 **Integration** 탭에서 Enterprise Token을 받습니다.
<Frame>
![Integrations](/images/enterprise/enterprise_action_auth_token.png)
@@ -176,4 +176,4 @@ crew를 배포하고 각 통합을 특정 사용자에게 범위 지정할 수
<Card title="도움이 필요하신가요?" icon="headset" href="mailto:support@crewai.com">
통합 설정이나 문제 해결에 대한 지원이 필요하시면 저희 지원팀에 문의하세요.
</Card>
</Card>

View File

@@ -0,0 +1,171 @@
---
title: "자동화 트리거"
description: "연결된 통합에서 특정 이벤트가 발생할 때 CrewAI 워크플로우를 자동으로 실행합니다"
icon: "bolt"
---
자동화 트리거를 사용하면 연결된 통합에서 특정 이벤트가 발생할 때 CrewAI 배포를 자동으로 실행할 수 있어, 비즈니스 시스템의 실시간 변화에 반응하는 강력한 이벤트 기반 워크플로우를 만들 수 있습니다.
## 개요
자동화 트리거를 사용하면 다음을 수행할 수 있습니다:
- **실시간 이벤트에 응답** - 특정 조건이 충족될 때 워크플로우를 자동으로 실행
- **외부 시스템과 통합** - Gmail, Outlook, OneDrive, JIRA, Slack, Stripe 등의 플랫폼과 연결
- **자동화 확장** - 수동 개입 없이 대용량 이벤트 처리
- **컨텍스트 유지** - crew와 flow 내에서 트리거 데이터에 액세스
## 자동화 트리거 관리
### 사용 가능한 트리거 보기
자동화 트리거에 액세스하고 관리하려면:
1. CrewAI 대시보드에서 배포로 이동
2. **트리거** 탭을 클릭하여 사용 가능한 모든 트리거 통합 보기
<Frame>
<img src="/images/enterprise/list-available-triggers.png" alt="사용 가능한 자동화 트리거 목록" />
</Frame>
이 보기는 배포에 사용 가능한 모든 트리거 통합과 현재 연결 상태를 보여줍니다.
### 트리거 활성화 및 비활성화
각 트리거는 토글 스위치를 사용하여 쉽게 활성화하거나 비활성화할 수 있습니다:
<Frame>
<img src="/images/enterprise/trigger-selected.png" alt="토글로 트리거 활성화 또는 비활성화" />
</Frame>
- **활성화됨 (파란색 토글)**: 트리거가 활성 상태이며 지정된 이벤트가 발생할 때 배포를 자동으로 실행합니다
- **비활성화됨 (회색 토글)**: 트리거가 비활성 상태이며 이벤트에 응답하지 않습니다
토글을 클릭하기만 하면 트리거 상태를 변경할 수 있습니다. 변경 사항은 즉시 적용됩니다.
### 트리거 실행 모니터링
트리거된 실행의 성능과 기록을 추적합니다:
<Frame>
<img src="/images/enterprise/list-executions.png" alt="자동화에 의해 트리거된 실행 목록" />
</Frame>
## 자동화 구축
자동화를 구축하기 전에 crew와 flow가 받을 트리거 페이로드의 구조를 이해하는 것이 도움이 됩니다.
### 페이로드 샘플 저장소
자동화를 구축하고 테스트하는 데 도움이 되도록 다양한 트리거 소스의 샘플 페이로드가 포함된 포괄적인 저장소를 유지 관리하고 있습니다:
**🔗 [CrewAI Enterprise 트리거 페이로드 샘플](https://github.com/crewAIInc/crewai-enterprise-trigger-payload-samples)**
이 저장소에는 다음이 포함되어 있습니다:
- **실제 페이로드 예제** - 다양한 트리거 소스(Gmail, Google Drive 등)에서 가져온 예제
- **페이로드 구조 문서** - 형식과 사용 가능한 필드를 보여주는 문서
### Crew와 트리거
기존 crew 정의는 트리거와 완벽하게 작동하며, 받은 페이로드를 분석하는 작업만 있으면 됩니다:
```python
@CrewBase
class MyAutomatedCrew:
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
)
@task
def parse_trigger_payload(self) -> Task:
return Task(
config=self.tasks_config['parse_trigger_payload'],
agent=self.researcher(),
)
@task
def analyze_trigger_content(self) -> Task:
return Task(
config=self.tasks_config['analyze_trigger_data'],
agent=self.researcher(),
)
```
crew는 자동으로 트리거 페이로드를 받고 표준 CrewAI 컨텍스트 메커니즘을 통해 액세스할 수 있습니다.
### Flow와의 통합
flow의 경우 트리거 데이터 처리 방법을 더 세밀하게 제어할 수 있습니다:
#### 트리거 페이로드 액세스
flow의 모든 `@start()` 메서드는 `crewai_trigger_payload`라는 추가 매개변수를 허용합니다:
```python
from crewai.flow import Flow, start, listen
class MyAutomatedFlow(Flow):
@start()
def handle_trigger(self, crewai_trigger_payload: dict = None):
"""
이 start 메서드는 트리거 데이터를 받을 수 있습니다
"""
if crewai_trigger_payload:
# 트리거 데이터 처리
trigger_id = crewai_trigger_payload.get('id')
event_data = crewai_trigger_payload.get('payload', {})
# 다른 메서드에서 사용할 수 있도록 flow 상태에 저장
self.state.trigger_id = trigger_id
self.state.trigger_type = event_data
return event_data
# 수동 실행 처리
return None
@listen(handle_trigger)
def process_data(self, trigger_data):
"""
트리거 데이터 처리
"""
# ... 트리거 처리
```
#### Flow에서 Crew 트리거하기
트리거된 flow 내에서 crew를 시작할 때 트리거 페이로드를 전달합니다:
```python
@start()
def delegate_to_crew(self, crewai_trigger_payload: dict = None):
"""
전문 crew에 처리 위임
"""
crew = MySpecializedCrew()
# crew에 트리거 페이로드 전달
result = crew.crew().kickoff(
inputs={
'a_custom_parameter': "custom_value",
'crewai_trigger_payload': crewai_trigger_payload
},
)
return result
```
## 문제 해결
**트리거가 작동하지 않는 경우:**
- 트리거가 활성화되어 있는지 확인
- 통합 연결 상태 확인
**실행 실패:**
- 오류 세부 정보는 실행 로그 확인
- 개발 중인 경우 입력에 올바른 페이로드가 포함된 `crewai_trigger_payload` 매개변수가 포함되어 있는지 확인
자동화 트리거는 CrewAI 배포를 기존 비즈니스 프로세스 및 도구와 완벽하게 통합할 수 있는 반응형 이벤트 기반 시스템으로 변환합니다.

View File

@@ -44,12 +44,12 @@ Para criar um listener de evento personalizado, você precisa:
Veja um exemplo simples de uma classe de listener de evento personalizado:
```python
from crewai.utilities.events import (
from crewai.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MeuListenerPersonalizado(BaseEventListener):
def __init__(self):
@@ -146,7 +146,7 @@ my_project/
```python
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
# ... importe events ...
class MyCustomListener(BaseEventListener):
@@ -268,7 +268,7 @@ Campos adicionais variam pelo tipo de evento. Por exemplo, `CrewKickoffCompleted
Para lidar temporariamente com eventos (útil para testes ou operações específicas), você pode usar o context manager `scoped_handlers`:
```python
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
from crewai.events import crewai_event_bus, CrewKickoffStartedEvent
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)

View File

@@ -681,11 +681,11 @@ O CrewAI emite eventos durante o processo de recuperação de knowledge que voc
#### Exemplo: Monitorando Recuperação de Knowledge
```python
from crewai.utilities.events import (
from crewai.events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
BaseEventListener,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class KnowledgeMonitorListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -708,10 +708,10 @@ O CrewAI suporta respostas em streaming de LLMs, permitindo que sua aplicação
O CrewAI emite eventos para cada chunk recebido durante o streaming:
```python
from crewai.utilities.events import (
from crewai.events import (
LLMStreamChunkEvent
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.events import BaseEventListener
class MyCustomListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):

View File

@@ -59,6 +59,7 @@ crew = Crew(
| **Output Pydantic** _(opcional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | Um modelo Pydantic para a saída da tarefa. |
| **Callback** _(opcional)_ | `callback` | `Optional[Any]` | Função/objeto a ser executado após a conclusão da tarefa. |
| **Guardrail** _(opcional)_ | `guardrail` | `Optional[Callable]` | Função para validar a saída da tarefa antes de prosseguir para a próxima tarefa. |
| **Max Tentativas Guardrail** _(opcional)_ | `guardrail_max_retries` | `Optional[int]` | Número máximo de tentativas quando a validação do guardrail falha. Padrão é 3. |
## Criando Tarefas
@@ -450,7 +451,7 @@ task = Task(
expected_output="Um objeto JSON válido",
agent=analyst,
guardrail=validate_json_output,
max_retries=3 # Limite de tentativas
guardrail_max_retries=3 # Limite de tentativas
)
```
@@ -935,7 +936,7 @@ task = Task(
description="Gerar dados",
expected_output="Dados válidos",
guardrail=validate_data,
max_retries=5 # Sobrescreve o limite padrão de tentativas
guardrail_max_retries=5 # Sobrescreve o limite padrão de tentativas
)
```

View File

@@ -58,7 +58,7 @@ Antes de usar as Integrações de Autenticação, certifique-se de que você pos
3. Clique em **Conectar** no serviço desejado na seção Integrações de Autenticação
4. Complete o fluxo de autenticação OAuth
5. Conceda as permissões necessárias para seu caso de uso
6. Obtenha seu Token Enterprise na sua página de conta do [CrewAI Enterprise](https://app.crewai.com) - https://app.crewai.com/crewai_plus/settings/account
6. Pronto! Obtenha seu Token Enterprise do [CrewAI Enterprise](https://app.crewai.com) na aba **Integration**
<Frame>
![Integrações](/images/enterprise/enterprise_action_auth_token.png)
@@ -176,4 +176,4 @@ Use o `user_bearer_token` para direcionar a integração a um usuário específi
<Card title="Precisa de ajuda?" icon="headset" href="mailto:support@crewai.com">
Entre em contato com nosso time de suporte para assistência com a configuração de integrações ou solução de problemas.
</Card>
</Card>

View File

@@ -0,0 +1,171 @@
---
title: "Triggers de Automação"
description: "Execute automaticamente seus workflows CrewAI quando eventos específicos ocorrem em integrações conectadas"
icon: "bolt"
---
Os triggers de automação permitem executar automaticamente suas implantações CrewAI quando eventos específicos ocorrem em suas integrações conectadas, criando workflows poderosos orientados por eventos que respondem a mudanças em tempo real em seus sistemas de negócio.
## Visão Geral
Com triggers de automação, você pode:
- **Responder a eventos em tempo real** - Execute workflows automaticamente quando condições específicas forem atendidas
- **Integrar com sistemas externos** - Conecte com plataformas como Gmail, Outlook, OneDrive, JIRA, Slack, Stripe e muito mais
- **Escalar sua automação** - Lide com eventos de alto volume sem intervenção manual
- **Manter contexto** - Acesse dados do trigger dentro de suas crews e flows
## Gerenciando Triggers de Automação
### Visualizando Triggers Disponíveis
Para acessar e gerenciar seus triggers de automação:
1. Navegue até sua implantação no painel do CrewAI
2. Clique na aba **Triggers** para visualizar todas as integrações de trigger disponíveis
<Frame>
<img src="/images/enterprise/list-available-triggers.png" alt="Lista de triggers de automação disponíveis" />
</Frame>
Esta visualização mostra todas as integrações de trigger disponíveis para sua implantação, junto com seus status de conexão atuais.
### Habilitando e Desabilitando Triggers
Cada trigger pode ser facilmente habilitado ou desabilitado usando o botão de alternância:
<Frame>
<img src="/images/enterprise/trigger-selected.png" alt="Habilitar ou desabilitar triggers com alternância" />
</Frame>
- **Habilitado (alternância azul)**: O trigger está ativo e executará automaticamente sua implantação quando os eventos especificados ocorrerem
- **Desabilitado (alternância cinza)**: O trigger está inativo e não responderá a eventos
Simplesmente clique na alternância para mudar o estado do trigger. As alterações entram em vigor imediatamente.
### Monitorando Execuções de Trigger
Acompanhe o desempenho e histórico de suas execuções acionadas:
<Frame>
<img src="/images/enterprise/list-executions.png" alt="Lista de execuções acionadas por automação" />
</Frame>
## Construindo Automação
Antes de construir sua automação, é útil entender a estrutura dos payloads de trigger que suas crews e flows receberão.
### Repositório de Amostras de Payload
Mantemos um repositório abrangente com amostras de payload de várias fontes de trigger para ajudá-lo a construir e testar suas automações:
**🔗 [Amostras de Payload de Trigger CrewAI Enterprise](https://github.com/crewAIInc/crewai-enterprise-trigger-payload-samples)**
Este repositório contém:
- **Exemplos reais de payload** de diferentes fontes de trigger (Gmail, Google Drive, etc.)
- **Documentação da estrutura de payload** mostrando o formato e campos disponíveis
### Triggers com Crew
Suas definições de crew existentes funcionam perfeitamente com triggers, você só precisa ter uma tarefa para analisar o payload recebido:
```python
@CrewBase
class MinhaCrewAutomatizada:
@agent
def pesquisador(self) -> Agent:
return Agent(
config=self.agents_config['pesquisador'],
)
@task
def analisar_payload_trigger(self) -> Task:
return Task(
config=self.tasks_config['analisar_payload_trigger'],
agent=self.pesquisador(),
)
@task
def analisar_conteudo_trigger(self) -> Task:
return Task(
config=self.tasks_config['analisar_dados_trigger'],
agent=self.pesquisador(),
)
```
A crew receberá automaticamente e pode acessar o payload do trigger através dos mecanismos de contexto padrão do CrewAI.
### Integração com Flows
Para flows, você tem mais controle sobre como os dados do trigger são tratados:
#### Acessando Payload do Trigger
Todos os métodos `@start()` em seus flows aceitarão um parâmetro adicional chamado `crewai_trigger_payload`:
```python
from crewai.flow import Flow, start, listen
class MeuFlowAutomatizado(Flow):
@start()
def lidar_com_trigger(self, crewai_trigger_payload: dict = None):
"""
Este método start pode receber dados do trigger
"""
if crewai_trigger_payload:
# Processa os dados do trigger
trigger_id = crewai_trigger_payload.get('id')
dados_evento = crewai_trigger_payload.get('payload', {})
# Armazena no estado do flow para uso por outros métodos
self.state.trigger_id = trigger_id
self.state.trigger_type = dados_evento
return dados_evento
# Lida com execução manual
return None
@listen(lidar_com_trigger)
def processar_dados(self, dados_trigger):
"""
Processa os dados do trigger
"""
# ... processa o trigger
```
#### Acionando Crews a partir de Flows
Ao iniciar uma crew dentro de um flow que foi acionado, passe o payload do trigger conforme ele:
```python
@start()
def delegar_para_crew(self, crewai_trigger_payload: dict = None):
"""
Delega processamento para uma crew especializada
"""
crew = MinhaCrewEspecializada()
# Passa o payload do trigger para a crew
resultado = crew.crew().kickoff(
inputs={
'parametro_personalizado': "valor_personalizado",
'crewai_trigger_payload': crewai_trigger_payload
},
)
return resultado
```
## Solução de Problemas
**Trigger não está sendo disparado:**
- Verifique se o trigger está habilitado
- Verifique o status de conexão da integração
**Falhas de execução:**
- Verifique os logs de execução para detalhes do erro
- Se você está desenvolvendo, certifique-se de que as entradas incluem o parâmetro `crewai_trigger_payload` com o payload correto
Os triggers de automação transformam suas implantações CrewAI em sistemas responsivos orientados por eventos que podem se integrar perfeitamente com seus processos de negócio e ferramentas existentes.

View File

@@ -48,7 +48,7 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools~=0.62.1"]
tools = ["crewai-tools~=0.69.0"]
embeddings = [
"tiktoken~=0.8.0"
]
@@ -68,12 +68,16 @@ docling = [
aisuite = [
"aisuite>=0.1.10",
]
qdrant = [
"qdrant-client[fastembed]>=1.14.3",
]
[tool.uv]
dev-dependencies = [
"ruff>=0.8.2",
"mypy>=1.10.0",
"pre-commit>=3.6.0",
"ruff>=0.12.11",
"mypy>=1.17.1",
"pre-commit>=4.3.0",
"bandit>=1.8.6",
"pillow>=10.2.0",
"cairosvg>=2.7.1",
"pytest>=8.0.0",
@@ -85,15 +89,40 @@ dev-dependencies = [
"pytest-timeout>=2.3.1",
"pytest-xdist>=3.6.1",
"pytest-split>=0.9.0",
"types-requests==2.32.*",
"types-pyyaml==6.0.*",
"types-regex==2024.11.6.*",
"types-appdirs==1.4.*",
]
[project.scripts]
crewai = "crewai.cli.cli:crewai"
[tool.ruff]
exclude = [
"src/crewai/cli/templates",
]
fix = true
[tool.ruff.lint]
select = [
"B006",
"UP006",
"UP007",
"UP035",
"UP037",
"UP004",
"UP008",
"UP010",
"UP018",
"UP031",
"UP032",
"I001",
"I002",
]
[tool.mypy]
ignore_missing_imports = true
disable_error_code = 'import-untyped'
exclude = ["cli/templates"]
exclude = ["src/crewai/cli/templates", "tests"]
[tool.bandit]
exclude_dirs = ["src/crewai/cli/templates"]

View File

@@ -1,4 +1,30 @@
import warnings
from typing import Any
def _suppress_pydantic_deprecation_warnings() -> None:
"""Suppress Pydantic deprecation warnings using targeted monkey patch."""
original_warn = warnings.warn
def filtered_warn(
message: Any,
category: type | None = None,
stacklevel: int = 1,
source: Any = None,
) -> Any:
if (
category
and hasattr(category, "__module__")
and category.__module__ == "pydantic.warnings"
):
return None
return original_warn(message, category, stacklevel + 1, source)
setattr(warnings, "warn", filtered_warn)
_suppress_pydantic_deprecation_warnings()
import threading
import urllib.request
@@ -15,17 +41,10 @@ from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
warnings.filterwarnings(
"ignore",
message="Pydantic serializer warnings:",
category=UserWarning,
module="pydantic.main",
)
_telemetry_submitted = False
def _track_install():
def _track_install() -> None:
"""Track package installation/first-use via Scarf analytics."""
global _telemetry_submitted
@@ -36,7 +55,7 @@ def _track_install():
pixel_url = "https://api.scarf.sh/v2/packages/CrewAI/crewai/docs/00f2dad1-8334-4a39-934e-003b2e1146db"
req = urllib.request.Request(pixel_url)
req.add_header('User-Agent', f'CrewAI-Python/{__version__}')
req.add_header("User-Agent", f"CrewAI-Python/{__version__}")
with urllib.request.urlopen(req, timeout=2): # nosec B310
_telemetry_submitted = True
@@ -45,7 +64,7 @@ def _track_install():
pass
def _track_install_async():
def _track_install_async() -> None:
"""Track installation in background thread to avoid blocking imports."""
if not Telemetry._is_telemetry_disabled():
thread = threading.Thread(target=_track_install, daemon=True)
@@ -54,7 +73,7 @@ def _track_install_async():
_track_install_async()
__version__ = "0.165.1"
__version__ = "0.177.0"
__all__ = [
"Agent",
"Crew",

View File

@@ -1,18 +1,50 @@
import shutil
import subprocess
import time
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Type, Union
from collections.abc import Callable, Sequence
from typing import (
Any,
Literal,
Optional,
)
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
from pydantic import (
BeforeValidator,
Field,
InstanceOf,
PrivateAttr,
computed_field,
field_validator,
model_validator,
)
from typing_extensions import Self
from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
)
from crewai.events.types.memory_events import (
MemoryRetrievalCompletedEvent,
MemoryRetrievalStartedEvent,
)
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
from crewai.lite_agent import LiteAgent, LiteAgentOutput
from crewai.llm import BaseLLM
from crewai.llms.base_llm import BaseLLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.security import Fingerprint
from crewai.task import Task
@@ -27,25 +59,7 @@ from crewai.utilities.agent_utils import (
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.utilities.events.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.memory_events import (
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
from crewai.utilities.events.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
)
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.llm_utils import create_default_llm, create_llm
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -76,6 +90,8 @@ class Agent(BaseAgent):
"""
_times_executed: int = PrivateAttr(default=0)
_llm: BaseLLM = PrivateAttr()
_function_calling_llm: BaseLLM | None = PrivateAttr(default=None)
max_execution_time: Optional[int] = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
@@ -90,10 +106,11 @@ class Agent(BaseAgent):
default=True,
description="Use system prompt for the agent.",
)
llm: Union[str, InstanceOf[BaseLLM], Any] = Field(
description="Language model that will run the agent.", default=None
llm: str | InstanceOf[BaseLLM] | None = Field(
description="Language model that will run the agent.",
default_factory=create_default_llm,
)
function_calling_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
function_calling_llm: str | InstanceOf[BaseLLM] | None = Field(
description="Language model that will run the agent.", default=None
)
system_template: Optional[str] = Field(
@@ -140,7 +157,7 @@ class Agent(BaseAgent):
default=None,
description="Maximum number of reasoning attempts before executing the task. If None, will try until ready.",
)
embedder: Optional[Dict[str, Any]] = Field(
embedder: Optional[dict[str, Any]] = Field(
default=None,
description="Embedder configuration for the agent.",
)
@@ -160,29 +177,45 @@ class Agent(BaseAgent):
default=None,
description="The Agent's role to be used from your repository.",
)
guardrail: Optional[Union[Callable[[Any], Tuple[bool, Any]], str]] = Field(
guardrail: Optional[Callable[[Any], tuple[bool, Any]] | str] = Field(
default=None,
description="Function or string description of a guardrail to validate agent output"
description="Function or string description of a guardrail to validate agent output",
)
guardrail_max_retries: int = Field(
default=3, description="Maximum number of retries when guardrail fails"
)
@model_validator(mode="before")
def validate_from_repository(cls, v):
@classmethod
def validate_from_repository(cls, v: Any) -> Any:
if v is not None and (from_repository := v.get("from_repository")):
return load_agent_from_repository(from_repository) | v
return v
@field_validator("function_calling_llm", mode="after")
@classmethod
def validate_function_calling_llm(cls, v: Any) -> BaseLLM | None:
if not v or isinstance(v, BaseLLM):
return v
return create_llm(v)
@model_validator(mode="after")
def post_init_setup(self):
def post_init_setup(self) -> Self:
self.agent_ops_agent_name = self.role
self.llm = create_llm(self.llm)
if self.function_calling_llm and not isinstance(
self.function_calling_llm, BaseLLM
):
self.function_calling_llm = create_llm(self.function_calling_llm)
# Validate and set the private LLM attributes
if isinstance(self.llm, BaseLLM):
self._llm = self.llm
elif self.llm is None:
self._llm = create_default_llm()
else:
self._llm = create_llm(self.llm)
if self.function_calling_llm:
if isinstance(self.function_calling_llm, BaseLLM):
self._function_calling_llm = self.function_calling_llm
else:
self._function_calling_llm = create_llm(self.function_calling_llm)
if not self.agent_executor:
self._setup_agent_executor()
@@ -192,12 +225,12 @@ class Agent(BaseAgent):
return self
def _setup_agent_executor(self):
def _setup_agent_executor(self) -> None:
if not self.cache_handler:
self.cache_handler = CacheHandler()
self.set_cache_handler(self.cache_handler)
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
def set_knowledge(self, crew_embedder: Optional[dict[str, Any]] = None) -> None:
try:
if self.embedder is None and crew_embedder:
self.embedder = crew_embedder
@@ -234,8 +267,8 @@ class Agent(BaseAgent):
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
tools: Optional[list[BaseTool]] = None,
) -> Any:
"""Execute a task with the agent.
Args:
@@ -276,7 +309,7 @@ class Agent(BaseAgent):
self._inject_date_to_task(task)
if self.tools_handler:
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")
self.tools_handler.last_used_tool = None
task_prompt = task.prompt()
@@ -309,15 +342,20 @@ class Agent(BaseAgent):
event=MemoryRetrievalStartedEvent(
task_id=str(task.id) if task else None,
source_type="agent",
from_agent=self,
from_task=task,
),
)
start_time = time.time()
contextual_memory = ContextualMemory(
self.crew._short_term_memory,
self.crew._long_term_memory,
self.crew._entity_memory,
self.crew._external_memory,
agent=self,
task=task,
)
memory = contextual_memory.build_context_for_task(task, context)
if memory.strip() != "":
@@ -330,13 +368,14 @@ class Agent(BaseAgent):
memory_content=memory,
retrieval_time_ms=(time.time() - start_time) * 1000,
source_type="agent",
from_agent=self,
from_task=task,
),
)
knowledge_config = (
self.knowledge_config.model_dump() if self.knowledge_config else {}
)
if self.knowledge or (self.crew and self.crew.knowledge):
crewai_event_bus.emit(
self,
@@ -400,7 +439,7 @@ class Agent(BaseAgent):
)
tools = tools or self.tools or []
self.create_agent_executor(tools=tools, task=task)
self.create_agent_executor(task=task, tools=tools)
if self.crew and self.crew._train:
task_prompt = self._training_handler(task_prompt=task_prompt)
@@ -475,7 +514,7 @@ class Agent(BaseAgent):
# If there was any tool in self.tools_results that had result_as_answer
# set to True, return the results of the last tool that had
# result_as_answer set to True
for tool_result in self.tools_results: # type: ignore # Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable)
for tool_result in self.tools_results:
if tool_result.get("result_as_answer", False):
result = tool_result["result"]
crewai_event_bus.emit(
@@ -484,7 +523,7 @@ class Agent(BaseAgent):
)
return result
def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> str:
def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> Any:
"""Execute a task with a timeout.
Args:
@@ -517,7 +556,7 @@ class Agent(BaseAgent):
future.cancel()
raise RuntimeError(f"Task execution failed: {str(e)}")
def _execute_without_timeout(self, task_prompt: str, task: Task) -> str:
def _execute_without_timeout(self, task_prompt: str, task: Task) -> Any:
"""Execute a task without a timeout.
Args:
@@ -527,6 +566,9 @@ class Agent(BaseAgent):
Returns:
The output of the agent.
"""
assert self.agent_executor is not None, (
"Agent executor must be created before execution"
)
return self.agent_executor.invoke(
{
"input": task_prompt,
@@ -537,14 +579,15 @@ class Agent(BaseAgent):
)["output"]
def create_agent_executor(
self, tools: Optional[List[BaseTool]] = None, task=None
self, task: Task, tools: Optional[list[BaseTool]] = None
) -> None:
"""Create an agent executor for the agent.
Returns:
An instance of the CrewAgentExecutor class.
Args:
task: Task to execute.
tools: Optional list of tools to use.
"""
raw_tools: List[BaseTool] = tools or self.tools or []
raw_tools: list[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
prompt = Prompts(
@@ -565,7 +608,7 @@ class Agent(BaseAgent):
)
self.agent_executor = CrewAgentExecutor(
llm=self.llm,
llm=self._llm,
task=task,
agent=self,
crew=self.crew,
@@ -578,15 +621,15 @@ class Agent(BaseAgent):
tools_names=get_tool_names(parsed_tools),
tools_description=render_text_description_and_args(parsed_tools),
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
function_calling_llm=self._function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=(
self._rpm_controller.check_or_wait if self._rpm_controller else None
),
callbacks=[TokenCalcHandler(self._token_process)],
litellm_callbacks=[TokenCalcHandler(self._token_process)],
)
def get_delegation_tools(self, agents: List[BaseAgent]):
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()
return tools
@@ -596,7 +639,7 @@ class Agent(BaseAgent):
return [AddImageTool()]
def get_code_execution_tools(self):
def get_code_execution_tools(self) -> list[BaseTool]:
try:
from crewai_tools import CodeInterpreterTool # type: ignore
@@ -607,8 +650,11 @@ class Agent(BaseAgent):
self._logger.log(
"info", "Coding tools not available. Install crewai_tools. "
)
return []
def get_output_converter(self, llm, text, model, instructions):
def get_output_converter(
self, llm: BaseLLM, text: str, model: str, instructions: str
) -> Converter:
return Converter(llm=llm, text=text, model=model, instructions=instructions)
def _training_handler(self, task_prompt: str) -> str:
@@ -637,7 +683,7 @@ class Agent(BaseAgent):
)
return task_prompt
def _render_text_description(self, tools: List[Any]) -> str:
def _render_text_description(self, tools: list[Any]) -> str:
"""Render the tool name and description in plain text.
Output will be in the format of:
@@ -656,7 +702,7 @@ class Agent(BaseAgent):
return description
def _inject_date_to_task(self, task):
def _inject_date_to_task(self, task: Task) -> None:
"""Inject the current date into the task description if inject_date is enabled."""
if self.inject_date:
from datetime import datetime
@@ -706,7 +752,7 @@ class Agent(BaseAgent):
f"Docker is not running. Please start Docker to use code execution with agent: {self.role}"
)
def __repr__(self):
def __repr__(self) -> str:
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"
@property
@@ -719,7 +765,7 @@ class Agent(BaseAgent):
"""
return self.security_config.fingerprint
def set_fingerprint(self, fingerprint: Fingerprint):
def set_fingerprint(self, fingerprint: Fingerprint) -> None:
self.security_config.fingerprint = fingerprint
def _get_knowledge_search_query(self, task_prompt: str) -> str | None:
@@ -735,22 +781,8 @@ class Agent(BaseAgent):
task_prompt=task_prompt
)
rewriter_prompt = self.i18n.slice("knowledge_search_query_system_prompt")
if not isinstance(self.llm, BaseLLM):
self._logger.log(
"warning",
f"Knowledge search query failed: LLM for agent '{self.role}' is not an instance of BaseLLM",
)
crewai_event_bus.emit(
self,
event=KnowledgeQueryFailedEvent(
agent=self,
error="LLM is not compatible with knowledge search queries",
),
)
return None
try:
rewritten_query = self.llm.call(
rewritten_query = self._llm.call(
[
{
"role": "system",
@@ -779,8 +811,8 @@ class Agent(BaseAgent):
def kickoff(
self,
messages: Union[str, List[Dict[str, str]]],
response_format: Optional[Type[Any]] = None,
messages: str | list[dict[str, str]],
response_format: Optional[type[Any]] = None,
) -> LiteAgentOutput:
"""
Execute the agent with the given messages using a LiteAgent instance.
@@ -802,7 +834,7 @@ class Agent(BaseAgent):
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
llm=self._llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
@@ -819,8 +851,8 @@ class Agent(BaseAgent):
async def kickoff_async(
self,
messages: Union[str, List[Dict[str, str]]],
response_format: Optional[Type[Any]] = None,
messages: str | list[dict[str, str]],
response_format: Optional[type[Any]] = None,
) -> LiteAgentOutput:
"""
Execute the agent asynchronously with the given messages using a LiteAgent instance.
@@ -840,7 +872,7 @@ class Agent(BaseAgent):
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
llm=self._llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,

View File

@@ -1,5 +1,10 @@
from .cache.cache_handler import CacheHandler
from .parser import CrewAgentParser
from .tools_handler import ToolsHandler
from crewai.agents.parser import parse, AgentAction, AgentFinish, OutputParserException
from crewai.agents.tools_handler import ToolsHandler
__all__ = ["CacheHandler", "CrewAgentParser", "ToolsHandler"]
__all__ = [
"parse",
"AgentAction",
"AgentFinish",
"OutputParserException",
"ToolsHandler",
]

View File

@@ -1,4 +1,4 @@
from typing import Any, AsyncIterable, Dict, List, Optional
from typing import Any, Optional
from pydantic import Field, PrivateAttr
@@ -10,21 +10,22 @@ from crewai.agents.agent_adapters.langgraph.structured_output_converter import (
LangGraphConverterAdapter,
)
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool
from crewai.utilities import Logger
from crewai.utilities.converter import Converter
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.agent_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import BaseTool
from crewai.utilities import Logger
from crewai.utilities.converter import Converter
try:
from langchain_core.messages import ToolMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import ( # type: ignore
MemorySaver,
)
from langgraph.prebuilt import create_react_agent # type: ignore
LANGGRAPH_AVAILABLE = True
except ImportError:
@@ -52,11 +53,11 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
role: str,
goal: str,
backstory: str,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
llm: Any = None,
max_iterations: int = 10,
agent_config: Optional[Dict[str, Any]] = None,
**kwargs,
agent_config: Optional[dict[str, Any]] = None,
**kwargs: Any,
):
"""Initialize the LangGraph agent adapter."""
if not LANGGRAPH_AVAILABLE:
@@ -82,7 +83,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
try:
self._memory = MemorySaver()
converted_tools: List[Any] = self._tool_adapter.tools()
converted_tools: list[Any] = self._tool_adapter.tools()
if self._agent_config:
self._graph = create_react_agent(
model=self.llm,
@@ -112,7 +113,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
"""Build a system prompt for the LangGraph agent."""
base_prompt = f"""
You are {self.role}.
Your goal is: {self.goal}
Your backstory: {self.backstory}
@@ -125,10 +126,10 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> str:
"""Execute a task using the LangGraph workflow."""
self.create_agent_executor(tools)
self.create_agent_executor(task, tools)
self.configure_structured_output(task)
@@ -198,11 +199,13 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
)
raise
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
def create_agent_executor(
self, task: Any = None, tools: Optional[list[BaseTool]] = None
) -> None:
"""Configure the LangGraph agent for execution."""
self.configure_tools(tools)
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
def configure_tools(self, tools: Optional[list[BaseTool]] = None) -> None:
"""Configure tools for the LangGraph agent."""
if tools:
all_tools = list(self.tools or []) + list(tools or [])
@@ -210,7 +213,7 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
available_tools = self._tool_adapter.tools()
self._graph.tools = available_tools
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
"""Implement delegation tools support for LangGraph."""
agent_tools = AgentTools(agents=agents)
return agent_tools.tools()
@@ -221,6 +224,6 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
"""Convert output format if needed."""
return Converter(llm=llm, text=text, model=model, instructions=instructions)
def configure_structured_output(self, task) -> None:
def configure_structured_output(self, task: Any) -> None:
"""Configure the structured output for LangGraph."""
self._converter_adapter.configure_structured_output(task)

View File

@@ -1,4 +1,5 @@
import json
from typing import Any
from crewai.agents.agent_adapters.base_converter_adapter import BaseConverterAdapter
from crewai.utilities.converter import generate_model_description
@@ -7,14 +8,15 @@ from crewai.utilities.converter import generate_model_description
class LangGraphConverterAdapter(BaseConverterAdapter):
"""Adapter for handling structured output conversion in LangGraph agents"""
def __init__(self, agent_adapter):
def __init__(self, agent_adapter: Any) -> None:
"""Initialize the converter adapter with a reference to the agent adapter"""
super().__init__(agent_adapter) # type: ignore
self.agent_adapter = agent_adapter
self._output_format = None
self._schema = None
self._system_prompt_appendix = None
self._output_format: str | None = None
self._schema: str | None = None
self._system_prompt_appendix: str | None = None
def configure_structured_output(self, task) -> None:
def configure_structured_output(self, task: Any) -> None:
"""Configure the structured output for LangGraph."""
if not (task.output_json or task.output_pydantic):
self._output_format = None
@@ -41,7 +43,7 @@ Important: Your final answer MUST be provided in the following structured format
{self._schema}
DO NOT include any markdown code blocks, backticks, or other formatting around your response.
DO NOT include any markdown code blocks, backticks, or other formatting around your response.
The output should be raw JSON that exactly matches the specified schema.
"""

View File

@@ -1,4 +1,4 @@
from typing import Any, List, Optional
from typing import Any, Optional
from pydantic import Field, PrivateAttr
@@ -7,19 +7,19 @@ from crewai.agents.agent_adapters.openai_agents.structured_output_converter impo
OpenAIConverterAdapter,
)
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Logger
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.agent_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities import Logger
try:
from agents import Agent as OpenAIAgent # type: ignore
from agents import Runner, enable_verbose_stdout_logging # type: ignore
from agents import Agent as OpenAIAgent # type: ignore[import-not-found]
from agents import Runner, enable_verbose_stdout_logging
from .openai_agent_tool_adapter import OpenAIAgentToolAdapter
@@ -40,13 +40,14 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
step_callback: Any = Field(default=None)
_tool_adapter: "OpenAIAgentToolAdapter" = PrivateAttr()
_converter_adapter: OpenAIConverterAdapter = PrivateAttr()
agent_executor: Any = Field(default=None)
def __init__(
self,
model: str = "gpt-4o-mini",
tools: Optional[List[BaseTool]] = None,
agent_config: Optional[dict] = None,
**kwargs,
tools: Optional[list[BaseTool]] = None,
agent_config: Optional[dict[str, Any]] = None,
**kwargs: Any,
):
if not OPENAI_AVAILABLE:
raise ImportError(
@@ -72,7 +73,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
"""Build a system prompt for the OpenAI agent."""
base_prompt = f"""
You are {self.role}.
Your goal is: {self.goal}
Your backstory: {self.backstory}
@@ -85,11 +86,11 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
tools: Optional[list[BaseTool]] = None,
) -> Any:
"""Execute a task using the OpenAI Assistant"""
self._converter_adapter.configure_structured_output(task)
self.create_agent_executor(tools)
self.create_agent_executor(task, tools)
if self.verbose:
enable_verbose_stdout_logging()
@@ -109,6 +110,7 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
task=task,
),
)
assert hasattr(self, "agent_executor"), "agent_executor not initialized"
result = self.agent_executor.run_sync(self._openai_agent, task_prompt)
final_answer = self.handle_execution_result(result)
crewai_event_bus.emit(
@@ -131,7 +133,9 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
)
raise
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
def create_agent_executor(
self, task: Any = None, tools: Optional[list[BaseTool]] = None
) -> None:
"""
Configure the OpenAI agent for execution.
While OpenAI handles execution differently through Runner,
@@ -152,24 +156,24 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
self.agent_executor = Runner
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
def configure_tools(self, tools: Optional[list[BaseTool]] = None) -> None:
"""Configure tools for the OpenAI Assistant"""
if tools:
self._tool_adapter.configure_tools(tools)
if self._tool_adapter.converted_tools:
self._openai_agent.tools = self._tool_adapter.converted_tools
def handle_execution_result(self, result: Any) -> str:
def handle_execution_result(self, result: Any) -> Any:
"""Process OpenAI Assistant execution result converting any structured output to a string"""
return self._converter_adapter.post_process_result(result.final_output)
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
def get_delegation_tools(self, agents: list[BaseAgent]) -> list[BaseTool]:
"""Implement delegation tools support"""
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()
return tools
def configure_structured_output(self, task) -> None:
def configure_structured_output(self, task: Any) -> None:
"""Configure the structured output for the specific agent implementation.
Args:

View File

@@ -1,5 +1,6 @@
import json
import re
from typing import Any
from crewai.agents.agent_adapters.base_converter_adapter import BaseConverterAdapter
from crewai.utilities.converter import generate_model_description
@@ -19,14 +20,15 @@ class OpenAIConverterAdapter(BaseConverterAdapter):
_output_model: The Pydantic model for the output
"""
def __init__(self, agent_adapter):
def __init__(self, agent_adapter: Any) -> None:
"""Initialize the converter adapter with a reference to the agent adapter"""
super().__init__(agent_adapter) # type: ignore
self.agent_adapter = agent_adapter
self._output_format = None
self._schema = None
self._output_model = None
self._output_format: str | None = None
self._schema: str | None = None
self._output_model: Any = None
def configure_structured_output(self, task) -> None:
def configure_structured_output(self, task: Any) -> None:
"""
Configure the structured output for OpenAI agent based on task requirements.
@@ -75,7 +77,7 @@ class OpenAIConverterAdapter(BaseConverterAdapter):
return f"{base_prompt}\n\n{output_schema}"
def post_process_result(self, result: str) -> str:
def post_process_result(self, result: str) -> Any:
"""
Post-process the result to ensure it matches the expected format.

View File

@@ -1,8 +1,9 @@
import uuid
from abc import ABC, abstractmethod
from collections.abc import Callable
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, TypeVar
from typing import Any, Optional, TypeVar
from pydantic import (
UUID4,
@@ -14,6 +15,7 @@ from pydantic import (
model_validator,
)
from pydantic_core import PydanticCustomError
from typing_extensions import Self
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.agents.cache.cache_handler import CacheHandler
@@ -61,7 +63,7 @@ class BaseAgent(ABC, BaseModel):
Methods:
execute_task(task: Any, context: Optional[str] = None, tools: Optional[List[BaseTool]] = None) -> str:
Abstract method to execute a task.
create_agent_executor(tools=None) -> None:
create_agent_executor(task, tools=None) -> None:
Abstract method to create an agent executor.
get_delegation_tools(agents: List["BaseAgent"]):
Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew.
@@ -79,7 +81,7 @@ class BaseAgent(ABC, BaseModel):
Set private attributes.
"""
__hash__ = object.__hash__ # type: ignore
__hash__ = object.__hash__
_logger: Logger = PrivateAttr(default_factory=lambda: Logger(verbose=False))
_rpm_controller: Optional[RPMController] = PrivateAttr(default=None)
_request_within_rpm_limit: Any = PrivateAttr(default=None)
@@ -91,7 +93,7 @@ class BaseAgent(ABC, BaseModel):
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
config: Optional[Dict[str, Any]] = Field(
config: Optional[dict[str, Any]] = Field(
description="Configuration for the agent", default=None, exclude=True
)
cache: bool = Field(
@@ -108,14 +110,14 @@ class BaseAgent(ABC, BaseModel):
default=False,
description="Enable agent to delegate and ask questions among each other.",
)
tools: Optional[List[BaseTool]] = Field(
tools: Optional[list[BaseTool]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
max_iter: int = Field(
default=25, description="Maximum iterations for an agent to execute a task"
)
agent_executor: InstanceOf = Field(
default=None, description="An instance of the CrewAgentExecutor class."
agent_executor: Optional[Any] = Field(
default=None, description="An instance of the agent executor class."
)
llm: Any = Field(
default=None, description="Language model that will run the agent."
@@ -129,7 +131,7 @@ class BaseAgent(ABC, BaseModel):
default_factory=ToolsHandler,
description="An instance of the ToolsHandler class.",
)
tools_results: List[Dict[str, Any]] = Field(
tools_results: list[dict[str, Any]] = Field(
default=[], description="Results of the tools used by the agent."
)
max_tokens: Optional[int] = Field(
@@ -138,7 +140,7 @@ class BaseAgent(ABC, BaseModel):
knowledge: Optional[Knowledge] = Field(
default=None, description="Knowledge for the agent."
)
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
knowledge_sources: Optional[list[BaseKnowledgeSource]] = Field(
default=None,
description="Knowledge sources for the agent.",
)
@@ -150,7 +152,7 @@ class BaseAgent(ABC, BaseModel):
default_factory=SecurityConfig,
description="Security configuration for the agent, including fingerprinting.",
)
callbacks: List[Callable] = Field(
callbacks: list[Callable[..., Any]] = Field(
default=[], description="Callbacks to be used for the agent"
)
adapted_agent: bool = Field(
@@ -163,12 +165,12 @@ class BaseAgent(ABC, BaseModel):
@model_validator(mode="before")
@classmethod
def process_model_config(cls, values):
def process_model_config(cls, values: Any) -> Any:
return process_config(values, cls)
@field_validator("tools")
@classmethod
def validate_tools(cls, tools: List[Any]) -> List[BaseTool]:
def validate_tools(cls, tools: list[Any]) -> list[BaseTool]:
"""Validate and process the tools provided to the agent.
This method ensures that each tool is either an instance of BaseTool
@@ -196,7 +198,7 @@ class BaseAgent(ABC, BaseModel):
return processed_tools
@model_validator(mode="after")
def validate_and_set_attributes(self):
def validate_and_set_attributes(self) -> Self:
# Validate required fields
for field in ["role", "goal", "backstory"]:
if getattr(self, field) is None:
@@ -228,7 +230,7 @@ class BaseAgent(ABC, BaseModel):
)
@model_validator(mode="after")
def set_private_attrs(self):
def set_private_attrs(self) -> Self:
"""Set private attributes."""
self._logger = Logger(verbose=self.verbose)
if self.max_rpm and not self._rpm_controller:
@@ -240,7 +242,7 @@ class BaseAgent(ABC, BaseModel):
return self
@property
def key(self):
def key(self) -> str:
source = [
self._original_role or self.role,
self._original_goal or self.goal,
@@ -253,16 +255,18 @@ class BaseAgent(ABC, BaseModel):
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
tools: Optional[list[BaseTool]] = None,
) -> str:
pass
@abstractmethod
def create_agent_executor(self, tools=None) -> None:
def create_agent_executor(
self, task: Any, tools: Optional[list[BaseTool]] = None
) -> None:
pass
@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]:
def get_delegation_tools(self, agents: list["BaseAgent"]) -> list[BaseTool]:
"""Set the task tools that init BaseAgenTools class."""
pass
@@ -320,7 +324,7 @@ class BaseAgent(ABC, BaseModel):
return copied_agent
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
def interpolate_inputs(self, inputs: dict[str, Any]) -> None:
"""Interpolate inputs into the agent description and backstory."""
if self._original_role is None:
self._original_role = self.role
@@ -350,7 +354,7 @@ class BaseAgent(ABC, BaseModel):
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
self.create_agent_executor()
# Executor will be created when a task is executed
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
@@ -360,7 +364,7 @@ class BaseAgent(ABC, BaseModel):
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()
# Executor will be created when a task is executed
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
def set_knowledge(self, crew_embedder: Optional[dict[str, Any]] = None) -> None:
pass

View File

@@ -1,31 +1,32 @@
import time
from typing import TYPE_CHECKING, Dict, List
from typing import TYPE_CHECKING
from crewai.events.event_listener import event_listener
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.utilities import I18N
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.printer import Printer
from crewai.utilities.events.event_listener import event_listener
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.parser import AgentFinish
from crewai.crew import Crew
from crewai.task import Task
class CrewAgentExecutorMixin:
crew: "Crew"
crew: "Crew | None"
agent: "BaseAgent"
task: "Task"
iterations: int
max_iter: int
messages: List[Dict[str, str]]
messages: list[dict[str, str]]
_i18n: I18N
_printer: Printer = Printer()
def _create_short_term_memory(self, output) -> None:
def _create_short_term_memory(self, output: "AgentFinish") -> None:
"""Create and save a short-term memory item if conditions are met."""
if (
self.crew
@@ -35,7 +36,8 @@ class CrewAgentExecutorMixin:
):
try:
if (
hasattr(self.crew, "_short_term_memory")
self.crew
and hasattr(self.crew, "_short_term_memory")
and self.crew._short_term_memory
):
self.crew._short_term_memory.save(
@@ -43,13 +45,12 @@ class CrewAgentExecutorMixin:
metadata={
"observation": self.task.description,
},
agent=self.agent.role,
)
except Exception as e:
print(f"Failed to add to short term memory: {e}")
pass
def _create_external_memory(self, output) -> None:
def _create_external_memory(self, output: "AgentFinish") -> None:
"""Create and save a external-term memory item if conditions are met."""
if (
self.crew
@@ -65,13 +66,12 @@ class CrewAgentExecutorMixin:
"description": self.task.description,
"messages": self.messages,
},
agent=self.agent.role,
)
except Exception as e:
print(f"Failed to add to external memory: {e}")
pass
def _create_long_term_memory(self, output) -> None:
def _create_long_term_memory(self, output: "AgentFinish") -> None:
"""Create and save long-term and entity memory items based on evaluation."""
if (
self.crew
@@ -100,8 +100,8 @@ class CrewAgentExecutorMixin:
)
self.crew._long_term_memory.save(long_term_memory)
for entity in evaluation.entities:
entity_memory = EntityMemoryItem(
entity_memories = [
EntityMemoryItem(
name=entity.name,
type=entity.type,
description=entity.description,
@@ -109,7 +109,10 @@ class CrewAgentExecutorMixin:
[f"- {r}" for r in entity.relationships]
),
)
self.crew._entity_memory.save(entity_memory)
for entity in evaluation.entities
]
if entity_memories:
self.crew._entity_memory.save(entity_memories)
except AttributeError as e:
print(f"Missing attributes for long term memory: {e}")
pass
@@ -158,7 +161,9 @@ class CrewAgentExecutorMixin:
self._printer.print(content=prompt, color="bold_yellow")
response = input()
if response.strip() != "":
self._printer.print(content="\nProcessing your feedback...", color="cyan")
self._printer.print(
content="\nProcessing your feedback...", color="cyan"
)
return response
finally:
event_listener.formatter.resume_live_updates()

View File

@@ -1,3 +1,5 @@
from .cache_handler import CacheHandler
"""Internal caching utilities for agent tool execution.
__all__ = ["CacheHandler"]
This package provides caching mechanisms for storing and retrieving
tool execution results to avoid redundant operations.
"""

View File

@@ -1,15 +1,50 @@
from typing import Any, Dict, Optional
"""Cache handler for storing and retrieving tool execution results.
This module provides a caching mechanism for tool outputs in the CrewAI framework,
allowing agents to reuse previous tool execution results when the same tool is
called with identical arguments.
Classes:
CacheHandler: Manages the caching of tool execution results using an in-memory
dictionary with serialized tool arguments as keys.
"""
import json
from typing import Any
from pydantic import BaseModel, PrivateAttr
class CacheHandler(BaseModel):
"""Callback handler for tool usage."""
"""Callback handler for tool usage.
_cache: Dict[str, Any] = PrivateAttr(default_factory=dict)
def add(self, tool, input, output):
self._cache[f"{tool}-{input}"] = output
Notes:
TODO: Make thread-safe, currently not thread-safe.
"""
def read(self, tool, input) -> Optional[str]:
return self._cache.get(f"{tool}-{input}")
_cache: dict[str, Any] = PrivateAttr(default_factory=dict)
def add(self, tool: str, input_data: dict[str, Any] | None, output: str) -> None:
"""Add a tool execution result to the cache.
Args:
tool: The name of the tool.
input_data: The input arguments for the tool.
output: The output from the tool execution.
"""
cache_key = json.dumps(input_data, sort_keys=True) if input_data else ""
self._cache[f"{tool}-{cache_key}"] = output
def read(self, tool: str, input_data: dict[str, Any] | None) -> str | None:
"""Read a tool execution result from the cache.
Args:
tool: The name of the tool.
input_data: The input arguments for the tool.
Returns:
The cached output if found, None otherwise.
"""
cache_key = json.dumps(input_data, sort_keys=True) if input_data else ""
return self._cache.get(f"{tool}-{cache_key}")

View File

@@ -0,0 +1,27 @@
"""Constants for agent-related modules."""
import re
from typing import Final
# crewai.agents.parser constants
FINAL_ANSWER_ACTION: Final[str] = "Final Answer:"
MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE: Final[str] = (
"I did it wrong. Invalid Format: I missed the 'Action:' after 'Thought:'. I will do right next, and don't use a tool I have already used.\n"
)
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE: Final[str] = (
"I did it wrong. Invalid Format: I missed the 'Action Input:' after 'Action:'. I will do right next, and don't use a tool I have already used.\n"
)
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE: Final[str] = (
"I did it wrong. Tried to both perform Action and give a Final Answer at the same time, I must do one or the other"
)
UNABLE_TO_REPAIR_JSON_RESULTS: Final[list[str]] = ['""', "{}"]
ACTION_INPUT_REGEX: Final[re.Pattern[str]] = re.compile(
r"Action\s*\d*\s*:\s*(.*?)\s*Action\s*\d*\s*Input\s*\d*\s*:\s*(.*)", re.DOTALL
)
ACTION_REGEX: Final[re.Pattern[str]] = re.compile(
r"Action\s*\d*\s*:\s*(.*?)", re.DOTALL
)
ACTION_INPUT_ONLY_REGEX: Final[re.Pattern[str]] = re.compile(
r"\s*Action\s*\d*\s*Input\s*\d*\s*:\s*(.*)", re.DOTALL
)

View File

@@ -1,4 +1,17 @@
from typing import Any, Callable, Dict, List, Optional, Union
"""Agent executor for crew AI agents.
Handles agent execution flow including LLM interactions, tool execution,
and memory management.
"""
from __future__ import annotations
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from crewai.crew import Crew
from crewai.task import Task
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
@@ -8,7 +21,12 @@ from crewai.agents.parser import (
OutputParserException,
)
from crewai.agents.tools_handler import ToolsHandler
from crewai.llm import BaseLLM
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.llms.base_llm import BaseLLM
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
@@ -26,54 +44,73 @@ from crewai.utilities.agent_utils import (
is_context_length_exceeded,
process_llm_response,
)
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.logger import Logger
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.events.agent_events import (
AgentLogsStartedEvent,
AgentLogsExecutionEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
class CrewAgentExecutor(CrewAgentExecutorMixin):
_logger: Logger = Logger()
"""Executor for crew agents.
Manages the execution lifecycle of an agent including prompt formatting,
LLM interactions, tool execution, and feedback handling.
"""
def __init__(
self,
llm: Any,
task: Any,
crew: Any,
llm: BaseLLM,
task: Task,
crew: Crew | None,
agent: BaseAgent,
prompt: dict[str, str],
max_iter: int,
tools: List[CrewStructuredTool],
tools: list[CrewStructuredTool],
tools_names: str,
stop_words: List[str],
stop_words: list[str],
tools_description: str,
tools_handler: ToolsHandler,
step_callback: Any = None,
original_tools: List[Any] = [],
function_calling_llm: Any = None,
step_callback: Callable[[AgentAction | AgentFinish], None] | None = None,
original_tools: list[BaseTool] | None = None,
function_calling_llm: BaseLLM | None = None,
respect_context_window: bool = False,
request_within_rpm_limit: Optional[Callable[[], bool]] = None,
callbacks: List[Any] = [],
):
request_within_rpm_limit: Callable[[], bool] | None = None,
litellm_callbacks: list[Any] | None = None,
) -> None:
"""Initialize executor.
Args:
llm: Language model instance.
task: Task to execute.
crew: Optional Crew instance.
agent: Agent to execute.
prompt: Prompt templates.
max_iter: Maximum iterations.
tools: Available tools.
tools_names: Tool names string.
stop_words: Stop word list.
tools_description: Tool descriptions.
tools_handler: Tool handler instance.
step_callback: Optional step callback.
original_tools: Original tool list.
function_calling_llm: Optional function calling LLM.
respect_context_window: Respect context limits.
request_within_rpm_limit: RPM limit check function.
litellm_callbacks: Optional litellm callbacks list.
"""
self._i18n: I18N = I18N()
self.llm: BaseLLM = llm
self.llm = llm
self.task = task
self.agent = agent
self.crew = crew
self.crew: Crew | None = crew
self.prompt = prompt
self.tools = tools
self.tools_names = tools_names
self.stop = stop_words
self.max_iter = max_iter
self.callbacks = callbacks
self.litellm_callbacks = litellm_callbacks or []
self._printer: Printer = Printer()
self.tools_handler = tools_handler
self.original_tools = original_tools
self.original_tools = original_tools or []
self.step_callback = step_callback
self.use_stop_words = self.llm.supports_stop_words()
self.tools_description = tools_description
@@ -81,12 +118,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.respect_context_window = respect_context_window
self.request_within_rpm_limit = request_within_rpm_limit
self.ask_for_human_input = False
self.messages: List[Dict[str, str]] = []
self.messages: list[dict[str, str]] = []
self.iterations = 0
self.log_error_after = 3
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = {
tool.name: tool for tool in self.tools
}
existing_stop = self.llm.stop or []
self.llm.stop = list(
set(
@@ -96,7 +130,19 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
)
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
def invoke(self, inputs: dict[str, str]) -> dict[str, str]:
"""Execute the agent with given inputs.
Args:
inputs: Input dictionary containing prompt variables.
Returns:
Dictionary with agent output.
Raises:
AssertionError: If agent fails to reach final answer.
Exception: If unknown error occurs during execution.
"""
if "system" in self.prompt:
system_prompt = self._format_prompt(self.prompt.get("system", ""), inputs)
user_prompt = self._format_prompt(self.prompt.get("user", ""), inputs)
@@ -122,7 +168,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
handle_unknown_error(self._printer, e)
raise
if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
@@ -132,9 +177,13 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return {"output": formatted_answer.output}
def _invoke_loop(self) -> AgentFinish:
"""
Main loop to invoke the agent's thought process until it reaches a conclusion
or the maximum number of iterations is reached.
"""Execute agent loop until completion.
Returns:
Final answer from the agent.
Raises:
Exception: If litellm error or unknown error occurs.
"""
formatted_answer = None
while not isinstance(formatted_answer, AgentFinish):
@@ -146,7 +195,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
callbacks=self.litellm_callbacks,
)
enforce_rpm_limit(self.request_within_rpm_limit)
@@ -154,19 +203,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
callbacks=self.litellm_callbacks,
printer=self._printer,
from_task=self.task
from_task=self.task,
)
formatted_answer = process_llm_response(answer, self.use_stop_words)
if isinstance(formatted_answer, AgentAction):
# Extract agent fingerprint if available
fingerprint_context = {}
if (
self.agent
and hasattr(self.agent, "security_config")
and hasattr(self.agent.security_config, "fingerprint")
if hasattr(self.agent, "security_config") and hasattr(
self.agent.security_config, "fingerprint"
):
fingerprint_context = {
"agent_fingerprint": str(
@@ -179,8 +226,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
fingerprint_context=fingerprint_context,
tools=self.tools,
i18n=self._i18n,
agent_key=self.agent.key if self.agent else None,
agent_role=self.agent.role if self.agent else None,
agent_key=self.agent.key,
agent_role=self.agent.role,
tools_handler=self.tools_handler,
task=self.task,
agent=self.agent,
@@ -191,7 +238,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
self._invoke_step_callback(formatted_answer)
self._append_message(formatted_answer.text, role="assistant")
self._append_message(formatted_answer.text)
except OutputParserException as e:
formatted_answer = handle_output_parser_exception(
@@ -212,7 +259,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
callbacks=self.litellm_callbacks,
i18n=self._i18n,
)
continue
@@ -232,8 +279,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> Union[AgentAction, AgentFinish]:
"""Handle the AgentAction, execute tools, and process the results."""
) -> AgentAction | AgentFinish:
"""Process agent action and tool execution.
Args:
formatted_answer: Agent's action to execute.
tool_result: Result from tool execution.
Returns:
Updated action or final answer.
"""
# Special case for add_image_tool
add_image_tool = self._i18n.tools("add_image")
if (
@@ -252,88 +307,65 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
show_logs=self._show_logs,
)
def _invoke_step_callback(self, formatted_answer) -> None:
"""Invoke the step callback if it exists."""
def _invoke_step_callback(
self, formatted_answer: AgentAction | AgentFinish
) -> None:
"""Invoke step callback.
Args:
formatted_answer: Current agent response.
"""
if self.step_callback:
self.step_callback(formatted_answer)
def _append_message(self, text: str, role: str = "assistant") -> None:
"""Append a message to the message list with the given role."""
"""Add message to conversation history.
Args:
text: Message content.
role: Message role (default: assistant).
"""
self.messages.append(format_message_for_llm(text, role=role))
def _show_start_logs(self):
"""Show logs for the start of agent execution."""
if self.agent is None:
raise ValueError("Agent cannot be None")
def _show_start_logs(self) -> None:
"""Emit agent start event."""
crewai_event_bus.emit(
self.agent,
AgentLogsStartedEvent(
agent_role=self.agent.role,
task_description=(
getattr(self.task, "description") if self.task else "Not Found"
),
task_description=self.task.description,
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
or (self.crew.verbose if self.crew else False),
),
)
def _show_logs(self, formatted_answer: Union[AgentAction, AgentFinish]):
"""Show logs for the agent's execution."""
if self.agent is None:
raise ValueError("Agent cannot be None")
def _show_logs(self, formatted_answer: AgentAction | AgentFinish) -> None:
"""Emit agent execution event.
Args:
formatted_answer: Agent's response to log.
"""
crewai_event_bus.emit(
self.agent,
AgentLogsExecutionEvent(
agent_role=self.agent.role,
formatted_answer=formatted_answer,
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
or (self.crew.verbose if self.crew else False),
),
)
def _summarize_messages(self) -> None:
messages_groups = []
for message in self.messages:
content = message["content"]
cut_size = self.llm.get_context_window_size()
for i in range(0, len(content), cut_size):
messages_groups.append({"content": content[i : i + cut_size]})
summarized_contents = []
for group in messages_groups:
summary = self.llm.call(
[
format_message_for_llm(
self._i18n.slice("summarizer_system_message"), role="system"
),
format_message_for_llm(
self._i18n.slice("summarize_instruction").format(
group=group["content"]
),
),
],
callbacks=self.callbacks,
)
summarized_contents.append({"content": str(summary)})
merged_summary = " ".join(content["content"] for content in summarized_contents)
self.messages = [
format_message_for_llm(
self._i18n.slice("summary").format(merged_summary=merged_summary)
)
]
def _handle_crew_training_output(
self, result: AgentFinish, human_feedback: Optional[str] = None
self, result: AgentFinish, human_feedback: str | None = None
) -> None:
"""Handle the process of saving training data."""
agent_id = str(self.agent.id) # type: ignore
train_iteration = (
getattr(self.crew, "_train_iteration", None) if self.crew else None
)
"""Save training data.
Args:
result: Agent's final output.
human_feedback: Optional feedback from human.
"""
agent_id = str(self.agent.id)
train_iteration = getattr(self.crew, "_train_iteration", None)
if train_iteration is None or not isinstance(train_iteration, int):
self._printer.print(
@@ -372,20 +404,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
training_data[agent_id] = agent_training_data
training_handler.save(training_data)
def _format_prompt(self, prompt: str, inputs: Dict[str, str]) -> str:
@staticmethod
def _format_prompt(prompt: str, inputs: dict[str, str]) -> str:
"""Format prompt with input values.
Args:
prompt: Template string.
inputs: Values to substitute.
Returns:
Formatted prompt.
"""
prompt = prompt.replace("{input}", inputs["input"])
prompt = prompt.replace("{tool_names}", inputs["tool_names"])
prompt = prompt.replace("{tools}", inputs["tools"])
return prompt
def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish:
"""Handle human feedback with different flows for training vs regular use.
"""Process human feedback.
Args:
formatted_answer: The initial AgentFinish result to get feedback on
formatted_answer: Initial agent result.
Returns:
AgentFinish: The final answer after processing feedback
Final answer after feedback.
"""
human_feedback = self._ask_human_input(formatted_answer.output)
@@ -395,13 +437,25 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return self._handle_regular_feedback(formatted_answer, human_feedback)
def _is_training_mode(self) -> bool:
"""Check if crew is in training mode."""
"""Check if training mode is active.
Returns:
True if in training mode.
"""
return bool(self.crew and self.crew._train)
def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str
) -> AgentFinish:
"""Process feedback for training scenarios with single iteration."""
"""Process training feedback.
Args:
initial_answer: Initial agent output.
feedback: Training feedback.
Returns:
Improved answer.
"""
self._handle_crew_training_output(initial_answer, feedback)
self.messages.append(
format_message_for_llm(
@@ -416,7 +470,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish:
"""Process feedback for regular use with potential multiple iterations."""
"""Process regular feedback iteratively.
Args:
current_answer: Current agent output.
initial_feedback: Initial user feedback.
Returns:
Final answer after iterations.
"""
feedback = initial_feedback
answer = current_answer
@@ -431,30 +493,17 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
return answer
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration."""
"""Process single feedback iteration.
Args:
feedback: User feedback.
Returns:
Updated agent response.
"""
self.messages.append(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
return self._invoke_loop()
def _log_feedback_error(self, retry_count: int, error: Exception) -> None:
"""Log feedback processing errors."""
self._printer.print(
content=(
f"Error processing feedback: {error}. "
f"Retrying... ({retry_count + 1}/{MAX_LLM_RETRY})"
),
color="red",
)
def _log_max_retries_exceeded(self) -> None:
"""Log when max retries for feedback processing are exceeded."""
self._printer.print(
content=(
f"Failed to process feedback after {MAX_LLM_RETRY} attempts. "
"Ending feedback loop."
),
color="red",
)

View File

@@ -1,50 +1,67 @@
import re
from typing import Any, Optional, Union
"""Agent output parsing module for ReAct-style LLM responses.
from json_repair import repair_json
This module provides parsing functionality for agent outputs that follow
the ReAct (Reasoning and Acting) format, converting them into structured
AgentAction or AgentFinish objects.
"""
from crewai.utilities import I18N
from dataclasses import dataclass
FINAL_ANSWER_ACTION = "Final Answer:"
MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE = "I did it wrong. Invalid Format: I missed the 'Action:' after 'Thought:'. I will do right next, and don't use a tool I have already used.\n"
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE = "I did it wrong. Invalid Format: I missed the 'Action Input:' after 'Action:'. I will do right next, and don't use a tool I have already used.\n"
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE = "I did it wrong. Tried to both perform Action and give a Final Answer at the same time, I must do one or the other"
from json_repair import repair_json # type: ignore[import-untyped]
from crewai.agents.constants import (
ACTION_INPUT_ONLY_REGEX,
ACTION_INPUT_REGEX,
ACTION_REGEX,
FINAL_ANSWER_ACTION,
MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE,
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
UNABLE_TO_REPAIR_JSON_RESULTS,
)
from crewai.utilities.i18n import I18N
_I18N = I18N()
@dataclass
class AgentAction:
"""Represents an action to be taken by an agent."""
thought: str
tool: str
tool_input: str
text: str
result: str
def __init__(self, thought: str, tool: str, tool_input: str, text: str):
self.thought = thought
self.tool = tool
self.tool_input = tool_input
self.text = text
result: str | None = None
@dataclass
class AgentFinish:
"""Represents the final answer from an agent."""
thought: str
output: str
text: str
def __init__(self, thought: str, output: str, text: str):
self.thought = thought
self.output = output
self.text = text
class OutputParserException(Exception):
error: str
"""Exception raised when output parsing fails.
def __init__(self, error: str):
Attributes:
error: The error message.
"""
def __init__(self, error: str) -> None:
"""Initialize OutputParserException.
Args:
error: The error message.
"""
self.error = error
super().__init__(error)
class CrewAgentParser:
"""Parses ReAct-style LLM calls that have a single tool input.
def parse(text: str) -> AgentAction | AgentFinish:
"""Parse agent output text into AgentAction or AgentFinish.
Expects output to be in one of two formats.
@@ -62,108 +79,117 @@ class CrewAgentParser:
Thought: agent thought here
Final Answer: The temperature is 100 degrees
Args:
text: The agent output text to parse.
Returns:
AgentAction or AgentFinish based on the content.
Raises:
OutputParserException: If the text format is invalid.
"""
thought = _extract_thought(text)
includes_answer = FINAL_ANSWER_ACTION in text
action_match = ACTION_INPUT_REGEX.search(text)
_i18n: I18N = I18N()
agent: Any = None
if includes_answer:
final_answer = text.split(FINAL_ANSWER_ACTION)[-1].strip()
# Check whether the final answer ends with triple backticks.
if final_answer.endswith("```"):
# Count occurrences of triple backticks in the final answer.
count = final_answer.count("```")
# If count is odd then it's an unmatched trailing set; remove it.
if count % 2 != 0:
final_answer = final_answer[:-3].rstrip()
return AgentFinish(thought=thought, output=final_answer, text=text)
def __init__(self, agent: Optional[Any] = None):
self.agent = agent
elif action_match:
action = action_match.group(1)
clean_action = _clean_action(action)
@staticmethod
def parse_text(text: str) -> Union[AgentAction, AgentFinish]:
"""
Static method to parse text into an AgentAction or AgentFinish without needing to instantiate the class.
action_input = action_match.group(2).strip()
Args:
text: The text to parse.
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = _safe_repair_json(tool_input)
Returns:
Either an AgentAction or AgentFinish based on the parsed content.
"""
parser = CrewAgentParser()
return parser.parse(text)
def parse(self, text: str) -> Union[AgentAction, AgentFinish]:
thought = self._extract_thought(text)
includes_answer = FINAL_ANSWER_ACTION in text
regex = (
r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
return AgentAction(
thought=thought, tool=clean_action, tool_input=safe_tool_input, text=text
)
action_match = re.search(regex, text, re.DOTALL)
if includes_answer:
final_answer = text.split(FINAL_ANSWER_ACTION)[-1].strip()
# Check whether the final answer ends with triple backticks.
if final_answer.endswith("```"):
# Count occurrences of triple backticks in the final answer.
count = final_answer.count("```")
# If count is odd then it's an unmatched trailing set; remove it.
if count % 2 != 0:
final_answer = final_answer[:-3].rstrip()
return AgentFinish(thought, final_answer, text)
elif action_match:
action = action_match.group(1)
clean_action = self._clean_action(action)
if not ACTION_REGEX.search(text):
raise OutputParserException(
f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{_I18N.slice('final_answer_format')}",
)
elif not ACTION_INPUT_ONLY_REGEX.search(text):
raise OutputParserException(
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
)
else:
err_format = _I18N.slice("format_without_tools")
error = f"{err_format}"
raise OutputParserException(
error,
)
action_input = action_match.group(2).strip()
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = self._safe_repair_json(tool_input)
def _extract_thought(text: str) -> str:
"""Extract the thought portion from the text.
return AgentAction(thought, clean_action, safe_tool_input, text)
Args:
text: The full agent output text.
if not re.search(r"Action\s*\d*\s*:[\s]*(.*?)", text, re.DOTALL):
raise OutputParserException(
f"{MISSING_ACTION_AFTER_THOUGHT_ERROR_MESSAGE}\n{self._i18n.slice('final_answer_format')}",
)
elif not re.search(
r"[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)", text, re.DOTALL
):
raise OutputParserException(
MISSING_ACTION_INPUT_AFTER_ACTION_ERROR_MESSAGE,
)
else:
format = self._i18n.slice("format_without_tools")
error = f"{format}"
raise OutputParserException(
error,
)
Returns:
The extracted thought string.
"""
thought_index = text.find("\nAction")
if thought_index == -1:
thought_index = text.find("\nFinal Answer")
if thought_index == -1:
return ""
thought = text[:thought_index].strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
def _extract_thought(self, text: str) -> str:
thought_index = text.find("\nAction")
if thought_index == -1:
thought_index = text.find("\nFinal Answer")
if thought_index == -1:
return ""
thought = text[:thought_index].strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""
return text.strip().strip("*").strip()
def _clean_action(text: str) -> str:
"""Clean action string by removing non-essential formatting characters.
def _safe_repair_json(self, tool_input: str) -> str:
UNABLE_TO_REPAIR_JSON_RESULTS = ['""', "{}"]
Args:
text: The action text to clean.
# Skip repair if the input starts and ends with square brackets
# Explanation: The JSON parser has issues handling inputs that are enclosed in square brackets ('[]').
# These are typically valid JSON arrays or strings that do not require repair. Attempting to repair such inputs
# might lead to unintended alterations, such as wrapping the entire input in additional layers or modifying
# the structure in a way that changes its meaning. By skipping the repair for inputs that start and end with
# square brackets, we preserve the integrity of these valid JSON structures and avoid unnecessary modifications.
if tool_input.startswith("[") and tool_input.endswith("]"):
return tool_input
Returns:
The cleaned action string.
"""
return text.strip().strip("*").strip()
# Before repair, handle common LLM issues:
# 1. Replace """ with " to avoid JSON parser errors
tool_input = tool_input.replace('"""', '"')
def _safe_repair_json(tool_input: str) -> str:
"""Safely repair JSON input.
result = repair_json(tool_input)
if result in UNABLE_TO_REPAIR_JSON_RESULTS:
return tool_input
Args:
tool_input: The tool input string to repair.
return str(result)
Returns:
The repaired JSON string or original if repair fails.
"""
# Skip repair if the input starts and ends with square brackets
# Explanation: The JSON parser has issues handling inputs that are enclosed in square brackets ('[]').
# These are typically valid JSON arrays or strings that do not require repair. Attempting to repair such inputs
# might lead to unintended alterations, such as wrapping the entire input in additional layers or modifying
# the structure in a way that changes its meaning. By skipping the repair for inputs that start and end with
# square brackets, we preserve the integrity of these valid JSON structures and avoid unnecessary modifications.
if tool_input.startswith("[") and tool_input.endswith("]"):
return tool_input
# Before repair, handle common LLM issues:
# 1. Replace """ with " to avoid JSON parser errors
tool_input = tool_input.replace('"""', '"')
result = repair_json(tool_input)
if result in UNABLE_TO_REPAIR_JSON_RESULTS:
return tool_input
return str(result)

View File

@@ -1,32 +1,44 @@
from typing import Any, Optional, Union
"""Tools handler for managing tool execution and caching."""
from ..tools.cache_tools.cache_tools import CacheTools
from ..tools.tool_calling import InstructorToolCalling, ToolCalling
from .cache.cache_handler import CacheHandler
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.tools.cache_tools.cache_tools import CacheTools
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
class ToolsHandler:
"""Callback handler for tool usage."""
"""Callback handler for tool usage.
last_used_tool: ToolCalling = {} # type: ignore # BUG?: Incompatible types in assignment (expression has type "Dict[...]", variable has type "ToolCalling")
cache: Optional[CacheHandler]
Attributes:
last_used_tool: The most recently used tool calling instance.
cache: Optional cache handler for storing tool outputs.
"""
def __init__(self, cache: Optional[CacheHandler] = None):
"""Initialize the callback handler."""
self.cache = cache
self.last_used_tool = {} # type: ignore # BUG?: same as above
def __init__(self, cache: CacheHandler | None = None) -> None:
"""Initialize the callback handler.
Args:
cache: Optional cache handler for storing tool outputs.
"""
self.cache: CacheHandler | None = cache
self.last_used_tool: ToolCalling | InstructorToolCalling | None = None
def on_tool_use(
self,
calling: Union[ToolCalling, InstructorToolCalling],
calling: ToolCalling | InstructorToolCalling,
output: str,
should_cache: bool = True,
) -> Any:
"""Run when tool ends running."""
self.last_used_tool = calling # type: ignore # BUG?: Incompatible types in assignment (expression has type "Union[ToolCalling, InstructorToolCalling]", variable has type "ToolCalling")
) -> None:
"""Run when tool ends running.
Args:
calling: The tool calling instance.
output: The output from the tool execution.
should_cache: Whether to cache the tool output.
"""
self.last_used_tool = calling
if self.cache and should_cache and calling.tool_name != CacheTools().name:
self.cache.add(
tool=calling.tool_name,
input=calling.arguments,
input_data=calling.arguments,
output=output,
)

View File

@@ -1,6 +1 @@
ALGORITHMS = ["RS256"]
#TODO: The AUTH0 constants should be removed after WorkOS migration is completed
AUTH0_DOMAIN = "crewai.us.auth0.com"
AUTH0_CLIENT_ID = "DEVC5Fw6NlRoSzmDCcOhVq85EfLBjKa8"
AUTH0_AUDIENCE = "https://crewai.us.auth0.com/api/v2/"

View File

@@ -7,24 +7,27 @@ from rich.console import Console
from pydantic import BaseModel, Field
from .utils import TokenManager, validate_jwt_token
from urllib.parse import quote
from crewai.cli.plus_api import PlusAPI
from .utils import validate_jwt_token
from crewai.cli.shared.token_manager import TokenManager
from crewai.cli.config import Settings
from crewai.cli.authentication.constants import (
AUTH0_AUDIENCE,
AUTH0_CLIENT_ID,
AUTH0_DOMAIN,
)
console = Console()
class Oauth2Settings(BaseModel):
provider: str = Field(description="OAuth2 provider used for authentication (e.g., workos, okta, auth0).")
client_id: str = Field(description="OAuth2 client ID issued by the provider, used during authentication requests.")
domain: str = Field(description="OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens.")
audience: Optional[str] = Field(description="OAuth2 audience value, typically used to identify the target API or resource.", default=None)
provider: str = Field(
description="OAuth2 provider used for authentication (e.g., workos, okta, auth0)."
)
client_id: str = Field(
description="OAuth2 client ID issued by the provider, used during authentication requests."
)
domain: str = Field(
description="OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens."
)
audience: Optional[str] = Field(
description="OAuth2 audience value, typically used to identify the target API or resource.",
default=None,
)
@classmethod
def from_settings(cls):
@@ -44,11 +47,15 @@ class ProviderFactory:
settings = settings or Oauth2Settings.from_settings()
import importlib
module = importlib.import_module(f"crewai.cli.authentication.providers.{settings.provider.lower()}")
module = importlib.import_module(
f"crewai.cli.authentication.providers.{settings.provider.lower()}"
)
provider = getattr(module, f"{settings.provider.capitalize()}Provider")
return provider(settings)
class AuthenticationCommand:
def __init__(self):
self.token_manager = TokenManager()
@@ -58,26 +65,12 @@ class AuthenticationCommand:
"""Sign up to CrewAI+"""
console.print("Signing in to CrewAI Enterprise...\n", style="bold blue")
# TODO: WORKOS - Next line and conditional are temporary until migration to WorkOS is complete.
user_provider = self._determine_user_provider()
if user_provider == "auth0":
settings = Oauth2Settings(
provider="auth0",
client_id=AUTH0_CLIENT_ID,
domain=AUTH0_DOMAIN,
audience=AUTH0_AUDIENCE
)
self.oauth2_provider = ProviderFactory.from_settings(settings)
# End of temporary code.
device_code_data = self._get_device_code()
self._display_auth_instructions(device_code_data)
return self._poll_for_token(device_code_data)
def _get_device_code(
self
) -> Dict[str, Any]:
def _get_device_code(self) -> Dict[str, Any]:
"""Get the device code to authenticate the user."""
device_code_payload = {
@@ -86,7 +79,9 @@ class AuthenticationCommand:
"audience": self.oauth2_provider.get_audience(),
}
response = requests.post(
url=self.oauth2_provider.get_authorize_url(), data=device_code_payload, timeout=20
url=self.oauth2_provider.get_authorize_url(),
data=device_code_payload,
timeout=20,
)
response.raise_for_status()
return response.json()
@@ -97,9 +92,7 @@ class AuthenticationCommand:
console.print("2. Enter the following code: ", device_code_data["user_code"])
webbrowser.open(device_code_data["verification_uri_complete"])
def _poll_for_token(
self, device_code_data: Dict[str, Any]
) -> None:
def _poll_for_token(self, device_code_data: Dict[str, Any]) -> None:
"""Polls the server for the token until it is received, or max attempts are reached."""
token_payload = {
@@ -112,7 +105,9 @@ class AuthenticationCommand:
attempts = 0
while True and attempts < 10:
response = requests.post(self.oauth2_provider.get_token_url(), data=token_payload, timeout=30)
response = requests.post(
self.oauth2_provider.get_token_url(), data=token_payload, timeout=30
)
token_data = response.json()
if response.status_code == 200:
@@ -192,30 +187,3 @@ class AuthenticationCommand:
"\nRun [bold]crewai login[/bold] to try logging in again.\n",
style="yellow",
)
# TODO: WORKOS - This method is temporary until migration to WorkOS is complete.
def _determine_user_provider(self) -> str:
"""Determine which provider to use for authentication."""
console.print(
"Enter your CrewAI Enterprise account email: ", style="bold blue", end=""
)
email = input()
email_encoded = quote(email)
# It's not correct to call this method directly, but it's temporary until migration is complete.
response = PlusAPI("")._make_request(
"GET", f"/crewai_plus/api/v1/me/provider?email={email_encoded}"
)
if response.status_code == 200:
if response.json().get("provider") == "auth0":
return "auth0"
else:
return "workos"
else:
console.print(
"Error: Failed to authenticate with crewai enterprise. Ensure that you are using the latest crewai version and please try again. If the problem persists, contact support@crewai.com.",
style="red",
)
raise SystemExit

View File

@@ -1,4 +1,4 @@
from .utils import TokenManager
from crewai.cli.shared.token_manager import TokenManager
class AuthError(Exception):

View File

@@ -1,12 +1,5 @@
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import jwt
from jwt import PyJWKClient
from cryptography.fernet import Fernet
def validate_jwt_token(
@@ -67,118 +60,3 @@ def validate_jwt_token(
raise Exception(f"JWKS or key processing error: {str(e)}")
except jwt.InvalidTokenError as e:
raise Exception(f"Invalid token: {str(e)}")
class TokenManager:
def __init__(self, file_path: str = "tokens.enc") -> None:
"""
Initialize the TokenManager class.
:param file_path: The file path to store the encrypted tokens. Default is "tokens.enc".
"""
self.file_path = file_path
self.key = self._get_or_create_key()
self.fernet = Fernet(self.key)
def _get_or_create_key(self) -> bytes:
"""
Get or create the encryption key.
:return: The encryption key.
"""
key_filename = "secret.key"
key = self.read_secure_file(key_filename)
if key is not None:
return key
new_key = Fernet.generate_key()
self.save_secure_file(key_filename, new_key)
return new_key
def save_tokens(self, access_token: str, expires_at: int) -> None:
"""
Save the access token and its expiration time.
:param access_token: The access token to save.
:param expires_at: The UNIX timestamp of the expiration time.
"""
expiration_time = datetime.fromtimestamp(expires_at)
data = {
"access_token": access_token,
"expiration": expiration_time.isoformat(),
}
encrypted_data = self.fernet.encrypt(json.dumps(data).encode())
self.save_secure_file(self.file_path, encrypted_data)
def get_token(self) -> Optional[str]:
"""
Get the access token if it is valid and not expired.
:return: The access token if valid and not expired, otherwise None.
"""
encrypted_data = self.read_secure_file(self.file_path)
decrypted_data = self.fernet.decrypt(encrypted_data) # type: ignore
data = json.loads(decrypted_data)
expiration = datetime.fromisoformat(data["expiration"])
if expiration <= datetime.now():
return None
return data["access_token"]
def get_secure_storage_path(self) -> Path:
"""
Get the secure storage path based on the operating system.
:return: The secure storage path.
"""
if sys.platform == "win32":
# Windows: Use %LOCALAPPDATA%
base_path = os.environ.get("LOCALAPPDATA")
elif sys.platform == "darwin":
# macOS: Use ~/Library/Application Support
base_path = os.path.expanduser("~/Library/Application Support")
else:
# Linux and other Unix-like: Use ~/.local/share
base_path = os.path.expanduser("~/.local/share")
app_name = "crewai/credentials"
storage_path = Path(base_path) / app_name
storage_path.mkdir(parents=True, exist_ok=True)
return storage_path
def save_secure_file(self, filename: str, content: bytes) -> None:
"""
Save the content to a secure file.
:param filename: The name of the file.
:param content: The content to save.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
with open(file_path, "wb") as f:
f.write(content)
# Set appropriate permissions (read/write for owner only)
os.chmod(file_path, 0o600)
def read_secure_file(self, filename: str) -> Optional[bytes]:
"""
Read the content of a secure file.
:param filename: The name of the file.
:return: The content of the file if it exists, otherwise None.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
if not file_path.exists():
return None
with open(file_path, "rb") as f:
return f.read()

View File

@@ -11,6 +11,7 @@ from crewai.cli.constants import (
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN,
)
from crewai.cli.shared.token_manager import TokenManager
DEFAULT_CONFIG_PATH = Path.home() / ".config" / "crewai" / "settings.json"
@@ -53,6 +54,7 @@ HIDDEN_SETTINGS_KEYS = [
"tool_repository_password",
]
class Settings(BaseModel):
enterprise_base_url: Optional[str] = Field(
default=DEFAULT_CLI_SETTINGS["enterprise_base_url"],
@@ -74,12 +76,12 @@ class Settings(BaseModel):
oauth2_provider: str = Field(
description="OAuth2 provider used for authentication (e.g., workos, okta, auth0).",
default=DEFAULT_CLI_SETTINGS["oauth2_provider"]
default=DEFAULT_CLI_SETTINGS["oauth2_provider"],
)
oauth2_audience: Optional[str] = Field(
description="OAuth2 audience value, typically used to identify the target API or resource.",
default=DEFAULT_CLI_SETTINGS["oauth2_audience"]
default=DEFAULT_CLI_SETTINGS["oauth2_audience"],
)
oauth2_client_id: str = Field(
@@ -89,7 +91,7 @@ class Settings(BaseModel):
oauth2_domain: str = Field(
description="OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens.",
default=DEFAULT_CLI_SETTINGS["oauth2_domain"]
default=DEFAULT_CLI_SETTINGS["oauth2_domain"],
)
def __init__(self, config_path: Path = DEFAULT_CONFIG_PATH, **data):
@@ -116,6 +118,7 @@ class Settings(BaseModel):
"""Reset all settings to default values"""
self._reset_user_settings()
self._reset_cli_settings()
self._clear_auth_tokens()
self.dump()
def dump(self) -> None:
@@ -139,3 +142,7 @@ class Settings(BaseModel):
"""Reset all CLI settings to default values"""
for key in CLI_SETTINGS_KEYS:
setattr(self, key, DEFAULT_CLI_SETTINGS.get(key))
def _clear_auth_tokens(self) -> None:
"""Clear all authentication tokens"""
TokenManager().clear_tokens()

View File

@@ -117,17 +117,19 @@ class PlusAPI:
def get_organizations(self) -> requests.Response:
return self._make_request("GET", self.ORGANIZATIONS_RESOURCE)
def send_trace_batch(self, payload) -> requests.Response:
return self._make_request("POST", self.TRACING_RESOURCE, json=payload)
def initialize_trace_batch(self, payload) -> requests.Response:
return self._make_request(
"POST", f"{self.TRACING_RESOURCE}/batches", json=payload
"POST",
f"{self.TRACING_RESOURCE}/batches",
json=payload,
timeout=30,
)
def initialize_ephemeral_trace_batch(self, payload) -> requests.Response:
return self._make_request(
"POST", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches", json=payload
"POST",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches",
json=payload,
)
def send_trace_events(self, trace_batch_id: str, payload) -> requests.Response:
@@ -135,6 +137,7 @@ class PlusAPI:
"POST",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
timeout=30,
)
def send_ephemeral_trace_events(
@@ -144,6 +147,7 @@ class PlusAPI:
"POST",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
timeout=30,
)
def finalize_trace_batch(self, trace_batch_id: str, payload) -> requests.Response:
@@ -151,6 +155,7 @@ class PlusAPI:
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
timeout=30,
)
def finalize_ephemeral_trace_batch(
@@ -160,4 +165,5 @@ class PlusAPI:
"PATCH",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
timeout=30,
)

View File

@@ -10,8 +10,9 @@ console = Console()
class SettingsCommand(BaseCommand):
"""A class to handle CLI configuration commands."""
def __init__(self, settings_kwargs: dict[str, Any] = {}):
def __init__(self, settings_kwargs: dict[str, Any] | None = None):
super().__init__()
settings_kwargs = settings_kwargs or {}
self.settings = Settings(**settings_kwargs)
def list(self) -> None:

View File

@@ -0,0 +1,141 @@
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
from cryptography.fernet import Fernet
class TokenManager:
def __init__(self, file_path: str = "tokens.enc") -> None:
"""
Initialize the TokenManager class.
:param file_path: The file path to store the encrypted tokens. Default is "tokens.enc".
"""
self.file_path = file_path
self.key = self._get_or_create_key()
self.fernet = Fernet(self.key)
def _get_or_create_key(self) -> bytes:
"""
Get or create the encryption key.
:return: The encryption key.
"""
key_filename = "secret.key"
key = self.read_secure_file(key_filename)
if key is not None:
return key
new_key = Fernet.generate_key()
self.save_secure_file(key_filename, new_key)
return new_key
def save_tokens(self, access_token: str, expires_at: int) -> None:
"""
Save the access token and its expiration time.
:param access_token: The access token to save.
:param expires_at: The UNIX timestamp of the expiration time.
"""
expiration_time = datetime.fromtimestamp(expires_at)
data = {
"access_token": access_token,
"expiration": expiration_time.isoformat(),
}
encrypted_data = self.fernet.encrypt(json.dumps(data).encode())
self.save_secure_file(self.file_path, encrypted_data)
def get_token(self) -> Optional[str]:
"""
Get the access token if it is valid and not expired.
:return: The access token if valid and not expired, otherwise None.
"""
encrypted_data = self.read_secure_file(self.file_path)
if encrypted_data is None:
return None
decrypted_data = self.fernet.decrypt(encrypted_data) # type: ignore
data = json.loads(decrypted_data)
expiration = datetime.fromisoformat(data["expiration"])
if expiration <= datetime.now():
return None
return data["access_token"]
def clear_tokens(self) -> None:
"""
Clear the tokens.
"""
self.delete_secure_file(self.file_path)
def get_secure_storage_path(self) -> Path:
"""
Get the secure storage path based on the operating system.
:return: The secure storage path.
"""
if sys.platform == "win32":
# Windows: Use %LOCALAPPDATA%
base_path = os.environ.get("LOCALAPPDATA")
elif sys.platform == "darwin":
# macOS: Use ~/Library/Application Support
base_path = os.path.expanduser("~/Library/Application Support")
else:
# Linux and other Unix-like: Use ~/.local/share
base_path = os.path.expanduser("~/.local/share")
app_name = "crewai/credentials"
storage_path = Path(base_path) / app_name
storage_path.mkdir(parents=True, exist_ok=True)
return storage_path
def save_secure_file(self, filename: str, content: bytes) -> None:
"""
Save the content to a secure file.
:param filename: The name of the file.
:param content: The content to save.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
with open(file_path, "wb") as f:
f.write(content)
# Set appropriate permissions (read/write for owner only)
os.chmod(file_path, 0o600)
def read_secure_file(self, filename: str) -> Optional[bytes]:
"""
Read the content of a secure file.
:param filename: The name of the file.
:return: The content of the file if it exists, otherwise None.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
if not file_path.exists():
return None
with open(file_path, "rb") as f:
return f.read()
def delete_secure_file(self, filename: str) -> None:
"""
Delete the secure file.
:param filename: The name of the file.
"""
storage_path = self.get_secure_storage_path()
file_path = storage_path / filename
if file_path.exists():
file_path.unlink(missing_ok=True)

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.165.1,<1.0.0"
"crewai[tools]>=0.177.0,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.165.1,<1.0.0",
"crewai[tools]>=0.177.0,<1.0.0",
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.165.1"
"crewai[tools]>=0.177.0"
]
[tool.crewai]

View File

@@ -3,26 +3,18 @@ import json
import re
import uuid
import warnings
from collections.abc import Callable, Mapping, Set
from concurrent.futures import Future
from copy import copy as shallow_copy
from hashlib import md5
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Set,
Tuple,
Union,
cast,
)
from opentelemetry import baggage
from opentelemetry.context import attach, detach
from crewai.utilities.crew.models import CrewContext
from pydantic import (
UUID4,
BaseModel,
@@ -34,15 +26,36 @@ from pydantic import (
model_validator,
)
from pydantic_core import PydanticCustomError
from typing_extensions import Self
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache import CacheHandler
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.crews.crew_output import CrewOutput
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTestStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from crewai.flow.flow_trackable import FlowTrackable
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.llm import LLM, BaseLLM
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.external.external_memory import ExternalMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
@@ -57,29 +70,9 @@ from crewai.tools.base_tool import BaseTool, Tool
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import NOT_SPECIFIED, TRAINING_DATA_FILE
from crewai.utilities.crew.models import CrewContext
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.events.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestFailedEvent,
CrewTestStartedEvent,
CrewTrainCompletedEvent,
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.event_listener import EventListener
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.events.listeners.tracing.utils import (
is_tracing_enabled,
)
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -116,9 +109,12 @@ class Crew(FlowTrackable, BaseModel):
planning: Plan the crew execution and add the plan to the crew.
chat_llm: The language model used for orchestrating chat interactions with the crew.
security_config: Security configuration for the crew, including fingerprinting.
Notes:
TODO: Improve the embedder type from dict[str, Any] to a more specific TypedDict or dataclass.
"""
__hash__ = object.__hash__ # type: ignore
__hash__ = object.__hash__
_execution_span: Any = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr()
_logger: Logger = PrivateAttr()
@@ -130,7 +126,7 @@ class Crew(FlowTrackable, BaseModel):
_external_memory: Optional[InstanceOf[ExternalMemory]] = PrivateAttr()
_train: Optional[bool] = PrivateAttr(default=False)
_train_iteration: Optional[int] = PrivateAttr()
_inputs: Optional[Dict[str, Any]] = PrivateAttr(default=None)
_inputs: Optional[dict[str, Any]] = PrivateAttr(default=None)
_logging_color: str = PrivateAttr(
default="bold_purple",
)
@@ -140,8 +136,8 @@ class Crew(FlowTrackable, BaseModel):
name: Optional[str] = Field(default="crew")
cache: bool = Field(default=True)
tasks: List[Task] = Field(default_factory=list)
agents: List[BaseAgent] = Field(default_factory=list)
tasks: list[Task] = Field(default_factory=list)
agents: list[BaseAgent] = Field(default_factory=list)
process: Process = Field(default=Process.sequential)
verbose: bool = Field(default=False)
memory: bool = Field(
@@ -164,7 +160,7 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="An Instance of the ExternalMemory to be used by the Crew",
)
embedder: Optional[dict] = Field(
embedder: Optional[dict[str, Any]] = Field(
default=None,
description="Configuration for the embedder to be used for the crew.",
)
@@ -172,16 +168,16 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
manager_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
manager_llm: Optional[str | InstanceOf[BaseLLM] | Any] = Field(
description="Language model that will run the agent.", default=None
)
manager_agent: Optional[BaseAgent] = Field(
description="Custom agent that will be used as manager.", default=None
)
function_calling_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
function_calling_llm: Optional[str | InstanceOf[LLM] | Any] = Field(
description="Language model that will run the agent.", default=None
)
config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None)
config: Optional[Json[dict[str, Any]] | dict[str, Any]] = Field(default=None)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
share_crew: Optional[bool] = Field(default=False)
step_callback: Optional[Any] = Field(
@@ -192,13 +188,13 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Callback to be executed after each task for all agents execution.",
)
before_kickoff_callbacks: List[
Callable[[Optional[Dict[str, Any]]], Optional[Dict[str, Any]]]
before_kickoff_callbacks: list[
Callable[[Optional[dict[str, Any]]], Optional[dict[str, Any]]]
] = Field(
default_factory=list,
description="List of callbacks to be executed before crew kickoff. It may be used to adjust inputs before the crew is executed.",
)
after_kickoff_callbacks: List[Callable[[CrewOutput], CrewOutput]] = Field(
after_kickoff_callbacks: list[Callable[[CrewOutput], CrewOutput]] = Field(
default_factory=list,
description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.",
)
@@ -210,7 +206,7 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Path to the prompt json file to be used for the crew.",
)
output_log_file: Optional[Union[bool, str]] = Field(
output_log_file: Optional[bool | str] = Field(
default=None,
description="Path to the log file to be saved",
)
@@ -218,23 +214,23 @@ class Crew(FlowTrackable, BaseModel):
default=False,
description="Plan the crew execution and add the plan to the crew.",
)
planning_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
planning_llm: Optional[str | InstanceOf[BaseLLM] | Any] = Field(
default=None,
description="Language model that will run the AgentPlanner if planning is True.",
)
task_execution_output_json_files: Optional[List[str]] = Field(
task_execution_output_json_files: Optional[list[str]] = Field(
default=None,
description="List of file paths for task execution JSON files.",
)
execution_logs: List[Dict[str, Any]] = Field(
execution_logs: list[dict[str, Any]] = Field(
default=[],
description="List of execution logs for tasks",
)
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
knowledge_sources: Optional[list[BaseKnowledgeSource]] = Field(
default=None,
description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.",
)
chat_llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
chat_llm: Optional[str | InstanceOf[BaseLLM] | Any] = Field(
default=None,
description="LLM used to handle chatting with the crew.",
)
@@ -267,8 +263,8 @@ class Crew(FlowTrackable, BaseModel):
@field_validator("config", mode="before")
@classmethod
def check_config_type(
cls, v: Union[Json, Dict[str, Any]]
) -> Union[Json, Dict[str, Any]]:
cls, v: Json[dict[str, Any]] | dict[str, Any]
) -> Json[dict[str, Any]] | dict[str, Any]:
"""Validates that the config is a valid type.
Args:
v: The config to be validated.
@@ -277,10 +273,10 @@ class Crew(FlowTrackable, BaseModel):
"""
# TODO: Improve typing
return json.loads(v) if isinstance(v, Json) else v # type: ignore
return json.loads(v) if isinstance(v, str) else v
@model_validator(mode="after")
def set_private_attrs(self) -> "Crew":
def set_private_attrs(self) -> Self:
"""Set private attributes."""
self._cache_handler = CacheHandler()
@@ -300,7 +296,7 @@ class Crew(FlowTrackable, BaseModel):
return self
def _initialize_default_memories(self):
def _initialize_default_memories(self) -> None:
self._long_term_memory = self._long_term_memory or LongTermMemory()
self._short_term_memory = self._short_term_memory or ShortTermMemory(
crew=self,
@@ -311,7 +307,7 @@ class Crew(FlowTrackable, BaseModel):
)
@model_validator(mode="after")
def create_crew_memory(self) -> "Crew":
def create_crew_memory(self) -> Self:
"""Initialize private memory attributes."""
self._external_memory = (
# External memory doesnt support a default value since it was designed to be managed entirely externally
@@ -328,7 +324,7 @@ class Crew(FlowTrackable, BaseModel):
return self
@model_validator(mode="after")
def create_crew_knowledge(self) -> "Crew":
def create_crew_knowledge(self) -> Self:
"""Create the knowledge for the crew."""
if self.knowledge_sources:
try:
@@ -349,7 +345,7 @@ class Crew(FlowTrackable, BaseModel):
return self
@model_validator(mode="after")
def check_manager_llm(self):
def check_manager_llm(self) -> Self:
"""Validates that the language model is set when using hierarchical process."""
if self.process == Process.hierarchical:
if not self.manager_llm and not self.manager_agent:
@@ -371,7 +367,7 @@ class Crew(FlowTrackable, BaseModel):
return self
@model_validator(mode="after")
def check_config(self):
def check_config(self) -> Self:
"""Validates that the crew is properly configured with agents and tasks."""
if not self.config and not self.tasks and not self.agents:
raise PydanticCustomError(
@@ -392,20 +388,20 @@ class Crew(FlowTrackable, BaseModel):
return self
@model_validator(mode="after")
def validate_tasks(self):
def validate_tasks(self) -> Self:
if self.process == Process.sequential:
for task in self.tasks:
if task.agent is None:
raise PydanticCustomError(
"missing_agent_in_task",
f"Sequential process error: Agent is missing in the task with the following description: {task.description}", # type: ignore # Argument of type "str" cannot be assigned to parameter "message_template" of type "LiteralString"
f"Sequential process error: Agent is missing in the task with the following description: {task.description}",
{},
)
return self
@model_validator(mode="after")
def validate_end_with_at_most_one_async_task(self):
def validate_end_with_at_most_one_async_task(self) -> Self:
"""Validates that the crew ends with at most one asynchronous task."""
final_async_task_count = 0
@@ -426,7 +422,7 @@ class Crew(FlowTrackable, BaseModel):
return self
@model_validator(mode="after")
def validate_must_have_non_conditional_task(self) -> "Crew":
def validate_must_have_non_conditional_task(self) -> Self:
"""Ensure that a crew has at least one non-conditional task."""
if not self.tasks:
return self
@@ -442,7 +438,7 @@ class Crew(FlowTrackable, BaseModel):
return self
@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
def validate_first_task(self) -> Self:
"""Ensure the first task is not a ConditionalTask."""
if self.tasks and isinstance(self.tasks[0], ConditionalTask):
raise PydanticCustomError(
@@ -453,19 +449,21 @@ class Crew(FlowTrackable, BaseModel):
return self
@model_validator(mode="after")
def validate_async_tasks_not_async(self) -> "Crew":
def validate_async_tasks_not_async(self) -> Self:
"""Ensure that ConditionalTask is not async."""
for task in self.tasks:
if task.async_execution and isinstance(task, ConditionalTask):
raise PydanticCustomError(
"invalid_async_conditional_task",
f"Conditional Task: {task.description} , cannot be executed asynchronously.", # type: ignore # Argument of type "str" cannot be assigned to parameter "message_template" of type "LiteralString"
f"Conditional Task: {task.description} , cannot be executed asynchronously.",
{},
)
return self
@model_validator(mode="after")
def validate_async_task_cannot_include_sequential_async_tasks_in_context(self):
def validate_async_task_cannot_include_sequential_async_tasks_in_context(
self,
) -> Self:
"""
Validates that if a task is set to be executed asynchronously,
it cannot include other asynchronous tasks in its context unless
@@ -485,7 +483,7 @@ class Crew(FlowTrackable, BaseModel):
return self
@model_validator(mode="after")
def validate_context_no_future_tasks(self):
def validate_context_no_future_tasks(self) -> Self:
"""Validates that a task's context does not include future tasks."""
task_indices = {id(task): i for i, task in enumerate(self.tasks)}
@@ -502,7 +500,7 @@ class Crew(FlowTrackable, BaseModel):
@property
def key(self) -> str:
source: List[str] = [agent.key for agent in self.agents] + [
source: list[str] = [agent.key for agent in self.agents] + [
task.key for task in self.tasks
]
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
@@ -517,7 +515,7 @@ class Crew(FlowTrackable, BaseModel):
"""
return self.security_config.fingerprint
def _setup_from_config(self):
def _setup_from_config(self) -> None:
assert self.config is not None, "Config should not be None."
"""Initializes agents and tasks from the provided config."""
@@ -530,7 +528,7 @@ class Crew(FlowTrackable, BaseModel):
self.agents = [Agent(**agent) for agent in self.config["agents"]]
self.tasks = [self._create_task(task) for task in self.config["tasks"]]
def _create_task(self, task_config: Dict[str, Any]) -> Task:
def _create_task(self, task_config: dict[str, Any]) -> Task:
"""Creates a task instance from its configuration.
Args:
@@ -559,9 +557,10 @@ class Crew(FlowTrackable, BaseModel):
CrewTrainingHandler(filename).initialize_file()
def train(
self, n_iterations: int, filename: str, inputs: Optional[Dict[str, Any]] = {}
self, n_iterations: int, filename: str, inputs: Optional[dict[str, Any]] = None
) -> None:
"""Trains the crew for a given number of iterations."""
inputs = inputs or {}
try:
crewai_event_bus.emit(
self,
@@ -610,7 +609,7 @@ class Crew(FlowTrackable, BaseModel):
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = None,
inputs: Optional[dict[str, Any]] = None,
) -> CrewOutput:
ctx = baggage.set_baggage(
"crew_context", CrewContext(id=str(self.id), key=self.key)
@@ -642,8 +641,7 @@ class Crew(FlowTrackable, BaseModel):
for agent in self.agents:
agent.i18n = i18n
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.crew = self # type: ignore[attr-defined]
agent.crew = self
agent.set_knowledge(crew_embedder=self.embedder)
# TODO: Create an AgentFunctionCalling protocol for future refactoring
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
@@ -652,7 +650,7 @@ class Crew(FlowTrackable, BaseModel):
if not agent.step_callback: # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.step_callback = self.step_callback # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.create_agent_executor()
# Agent executor will be created when tasks are executed
if self.planning:
self._handle_crew_planning()
@@ -681,9 +679,9 @@ class Crew(FlowTrackable, BaseModel):
finally:
detach(token)
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
def kickoff_for_each(self, inputs: list[dict[str, Any]]) -> list[CrewOutput]:
"""Executes the Crew's workflow for each input in the list and aggregates results."""
results: List[CrewOutput] = []
results: list[CrewOutput] = []
# Initialize the parent crew's usage metrics
total_usage_metrics = UsageMetrics()
@@ -702,14 +700,19 @@ class Crew(FlowTrackable, BaseModel):
self._task_output_handler.reset()
return results
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> CrewOutput:
async def kickoff_async(
self, inputs: Optional[dict[str, Any]] = None
) -> CrewOutput:
"""Asynchronous kickoff method to start the crew execution."""
inputs = inputs or {}
return await asyncio.to_thread(self.kickoff, inputs)
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[CrewOutput]:
async def kickoff_for_each_async(
self, inputs: list[dict[str, Any]]
) -> list[CrewOutput]:
crew_copies = [self.copy() for _ in inputs]
async def run_crew(crew, input_data):
async def run_crew(crew: Self, input_data: dict[str, Any]) -> CrewOutput:
return await crew.kickoff_async(inputs=input_data)
tasks = [
@@ -728,7 +731,7 @@ class Crew(FlowTrackable, BaseModel):
self._task_output_handler.reset()
return results
def _handle_crew_planning(self):
def _handle_crew_planning(self) -> None:
"""Handles the Crew planning."""
self._logger.log("info", "Planning the crew execution")
result = CrewPlanner(
@@ -744,7 +747,7 @@ class Crew(FlowTrackable, BaseModel):
output: TaskOutput,
task_index: int,
was_replayed: bool = False,
):
) -> None:
if self._inputs:
inputs = self._inputs
else:
@@ -776,7 +779,7 @@ class Crew(FlowTrackable, BaseModel):
self._create_manager_agent()
return self._execute_tasks(self.tasks)
def _create_manager_agent(self):
def _create_manager_agent(self) -> None:
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
self.manager_agent.allow_delegation = True
@@ -788,7 +791,12 @@ class Crew(FlowTrackable, BaseModel):
manager.tools = []
raise Exception("Manager agent should not have tools")
else:
self.manager_llm = create_llm(self.manager_llm)
if self.manager_llm is None:
from crewai.utilities.llm_utils import create_default_llm
self.manager_llm = create_default_llm()
else:
self.manager_llm = create_llm(self.manager_llm)
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
@@ -803,7 +811,7 @@ class Crew(FlowTrackable, BaseModel):
def _execute_tasks(
self,
tasks: List[Task],
tasks: list[Task],
start_index: Optional[int] = 0,
was_replayed: bool = False,
) -> CrewOutput:
@@ -817,8 +825,8 @@ class Crew(FlowTrackable, BaseModel):
CrewOutput: Final output of the crew
"""
task_outputs: List[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
task_outputs: list[TaskOutput] = []
futures: list[tuple[Task, Future[TaskOutput], int]] = []
last_sync_output: Optional[TaskOutput] = None
for task_index, task in enumerate(tasks):
@@ -843,7 +851,7 @@ class Crew(FlowTrackable, BaseModel):
tools_for_task = self._prepare_tools(
agent_to_use,
task,
cast(Union[List[Tool], List[BaseTool]], tools_for_task),
cast(list[Tool] | list[BaseTool], tools_for_task),
)
self._log_task_start(task, agent_to_use.role)
@@ -863,7 +871,7 @@ class Crew(FlowTrackable, BaseModel):
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=cast(List[BaseTool], tools_for_task),
tools=tools_for_task,
)
futures.append((task, future, task_index))
else:
@@ -875,7 +883,7 @@ class Crew(FlowTrackable, BaseModel):
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=cast(List[BaseTool], tools_for_task),
tools=tools_for_task,
)
task_outputs.append(task_output)
self._process_task_result(task, task_output)
@@ -889,8 +897,8 @@ class Crew(FlowTrackable, BaseModel):
def _handle_conditional_task(
self,
task: ConditionalTask,
task_outputs: List[TaskOutput],
futures: List[Tuple[Task, Future[TaskOutput], int]],
task_outputs: list[TaskOutput],
futures: list[tuple[Task, Future[TaskOutput], int]],
task_index: int,
was_replayed: bool,
) -> Optional[TaskOutput]:
@@ -913,8 +921,8 @@ class Crew(FlowTrackable, BaseModel):
return None
def _prepare_tools(
self, agent: BaseAgent, task: Task, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, agent: BaseAgent, task: Task, tools: list[Tool] | list[BaseTool]
) -> list[BaseTool]:
# Add delegation tools if agent allows delegation
if hasattr(agent, "allow_delegation") and getattr(
agent, "allow_delegation", False
@@ -944,7 +952,7 @@ class Crew(FlowTrackable, BaseModel):
tools = self._add_multimodal_tools(agent, tools)
# Return a List[BaseTool] which is compatible with both Task.execute_sync and Task.execute_async
return cast(List[BaseTool], tools)
return cast(list[BaseTool], tools)
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
@@ -953,12 +961,12 @@ class Crew(FlowTrackable, BaseModel):
def _merge_tools(
self,
existing_tools: Union[List[Tool], List[BaseTool]],
new_tools: Union[List[Tool], List[BaseTool]],
) -> List[BaseTool]:
existing_tools: list[Tool] | list[BaseTool],
new_tools: list[Tool] | list[BaseTool],
) -> list[BaseTool]:
"""Merge new tools into existing tools list, avoiding duplicates by tool name."""
if not new_tools:
return cast(List[BaseTool], existing_tools)
return cast(list[BaseTool], existing_tools)
# Create mapping of tool names to new tools
new_tool_map = {tool.name: tool for tool in new_tools}
@@ -969,41 +977,41 @@ class Crew(FlowTrackable, BaseModel):
# Add all new tools
tools.extend(new_tools)
return cast(List[BaseTool], tools)
return tools
def _inject_delegation_tools(
self,
tools: Union[List[Tool], List[BaseTool]],
tools: list[Tool] | list[BaseTool],
task_agent: BaseAgent,
agents: List[BaseAgent],
) -> List[BaseTool]:
agents: list[BaseAgent],
) -> list[BaseTool]:
if hasattr(task_agent, "get_delegation_tools"):
delegation_tools = task_agent.get_delegation_tools(agents)
# Cast delegation_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], delegation_tools))
return cast(List[BaseTool], tools)
return self._merge_tools(tools, delegation_tools)
return cast(list[BaseTool], tools)
def _add_multimodal_tools(
self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, agent: BaseAgent, tools: list[Tool] | list[BaseTool]
) -> list[BaseTool]:
if hasattr(agent, "get_multimodal_tools"):
multimodal_tools = agent.get_multimodal_tools()
# Cast multimodal_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], multimodal_tools))
return cast(List[BaseTool], tools)
return self._merge_tools(tools, cast(list[BaseTool], multimodal_tools))
return cast(list[BaseTool], tools)
def _add_code_execution_tools(
self, agent: BaseAgent, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, agent: BaseAgent, tools: list[Tool] | list[BaseTool]
) -> list[BaseTool]:
if hasattr(agent, "get_code_execution_tools"):
code_tools = agent.get_code_execution_tools()
# Cast code_tools to the expected type for _merge_tools
return self._merge_tools(tools, cast(List[BaseTool], code_tools))
return cast(List[BaseTool], tools)
return self._merge_tools(tools, cast(list[BaseTool], code_tools))
return cast(list[BaseTool], tools)
def _add_delegation_tools(
self, task: Task, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, task: Task, tools: list[Tool] | list[BaseTool]
) -> list[BaseTool]:
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
if not tools:
@@ -1011,17 +1019,17 @@ class Crew(FlowTrackable, BaseModel):
tools = self._inject_delegation_tools(
tools, task.agent, agents_for_delegation
)
return cast(List[BaseTool], tools)
return cast(list[BaseTool], tools)
def _log_task_start(self, task: Task, role: str = "None"):
def _log_task_start(self, task: Task, role: str = "None") -> None:
if self.output_log_file:
self._file_handler.log(
task_name=task.name, task=task.description, agent=role, status="started"
)
def _update_manager_tools(
self, task: Task, tools: Union[List[Tool], List[BaseTool]]
) -> List[BaseTool]:
self, task: Task, tools: list[Tool] | list[BaseTool]
) -> list[BaseTool]:
if self.manager_agent:
if task.agent:
tools = self._inject_delegation_tools(tools, task.agent, [task.agent])
@@ -1029,9 +1037,9 @@ class Crew(FlowTrackable, BaseModel):
tools = self._inject_delegation_tools(
tools, self.manager_agent, self.agents
)
return cast(List[BaseTool], tools)
return cast(list[BaseTool], tools)
def _get_context(self, task: Task, task_outputs: List[TaskOutput]) -> str:
def _get_context(self, task: Task, task_outputs: list[TaskOutput]) -> str:
if not task.context:
return ""
@@ -1053,7 +1061,7 @@ class Crew(FlowTrackable, BaseModel):
output=output.raw,
)
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
def _create_crew_output(self, task_outputs: list[TaskOutput]) -> CrewOutput:
if not task_outputs:
raise ValueError("No task outputs available to create crew output.")
@@ -1084,10 +1092,10 @@ class Crew(FlowTrackable, BaseModel):
def _process_async_tasks(
self,
futures: List[Tuple[Task, Future[TaskOutput], int]],
futures: list[tuple[Task, Future[TaskOutput], int]],
was_replayed: bool = False,
) -> List[TaskOutput]:
task_outputs: List[TaskOutput] = []
) -> list[TaskOutput]:
task_outputs: list[TaskOutput] = []
for future_task, future, task_index in futures:
task_output = future.result()
task_outputs.append(task_output)
@@ -1098,7 +1106,7 @@ class Crew(FlowTrackable, BaseModel):
return task_outputs
def _find_task_index(
self, task_id: str, stored_outputs: List[Any]
self, task_id: str, stored_outputs: list[Any]
) -> Optional[int]:
return next(
(
@@ -1110,7 +1118,7 @@ class Crew(FlowTrackable, BaseModel):
)
def replay(
self, task_id: str, inputs: Optional[Dict[str, Any]] = None
self, task_id: str, inputs: Optional[dict[str, Any]] = None
) -> CrewOutput:
stored_outputs = self._task_output_handler.load()
if not stored_outputs:
@@ -1151,15 +1159,15 @@ class Crew(FlowTrackable, BaseModel):
return result
def query_knowledge(
self, query: List[str], results_limit: int = 3, score_threshold: float = 0.35
) -> Union[List[Dict[str, Any]], None]:
self, query: list[str], results_limit: int = 3, score_threshold: float = 0.35
) -> list[dict[str, Any]] | None:
if self.knowledge:
return self.knowledge.query(
query, results_limit=results_limit, score_threshold=score_threshold
)
return None
def fetch_inputs(self) -> Set[str]:
def fetch_inputs(self) -> set[str]:
"""
Gathers placeholders (e.g., {something}) referenced in tasks or agents.
Scans each task's 'description' + 'expected_output', and each agent's
@@ -1168,7 +1176,7 @@ class Crew(FlowTrackable, BaseModel):
Returns a set of all discovered placeholder names.
"""
placeholder_pattern = re.compile(r"\{(.+?)\}")
required_inputs: Set[str] = set()
required_inputs: set[str] = set()
# Scan tasks for inputs
for task in self.tasks:
@@ -1184,7 +1192,18 @@ class Crew(FlowTrackable, BaseModel):
return required_inputs
def copy(self):
def copy(
self,
*,
include: Optional[
Set[int] | Set[str] | Mapping[int, Any] | Mapping[str, Any]
] = None,
exclude: Optional[
Set[int] | Set[str] | Mapping[int, Any] | Mapping[str, Any]
] = None,
update: Optional[dict[str, Any]] = None,
deep: bool = True,
) -> "Crew":
"""
Creates a deep copy of the Crew instance.
@@ -1215,7 +1234,7 @@ class Crew(FlowTrackable, BaseModel):
manager_agent = self.manager_agent.copy() if self.manager_agent else None
manager_llm = shallow_copy(self.manager_llm) if self.manager_llm else None
task_mapping = {}
task_mapping: dict[str, Task] = {}
cloned_tasks = []
existing_knowledge_sources = shallow_copy(self.knowledge_sources)
@@ -1270,16 +1289,10 @@ class Crew(FlowTrackable, BaseModel):
if not task.callback:
task.callback = self.task_callback
def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
def _interpolate_inputs(self, inputs: dict[str, Any]) -> None:
"""Interpolates the inputs in the tasks and agents."""
[
task.interpolate_inputs_and_add_conversation_history(
# type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None)
inputs
)
for task in self.tasks
]
# type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None)
for task in self.tasks:
task.interpolate_inputs_and_add_conversation_history(inputs)
for agent in self.agents:
agent.interpolate_inputs(inputs)
@@ -1303,8 +1316,8 @@ class Crew(FlowTrackable, BaseModel):
def test(
self,
n_iterations: int,
eval_llm: Union[str, InstanceOf[BaseLLM]],
inputs: Optional[Dict[str, Any]] = None,
eval_llm: str | InstanceOf[BaseLLM],
inputs: Optional[dict[str, Any]] = None,
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
try:
@@ -1345,7 +1358,7 @@ class Crew(FlowTrackable, BaseModel):
)
raise
def __repr__(self):
def __repr__(self) -> str:
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"
def reset_memories(self, command_type: str) -> None:
@@ -1397,7 +1410,9 @@ class Crew(FlowTrackable, BaseModel):
if (system := config.get("system")) is not None:
name = config.get("name")
try:
reset_fn: Callable = cast(Callable, config.get("reset"))
reset_fn: Callable[..., None] = cast(
Callable[..., None], config.get("reset")
)
reset_fn(system)
self._logger.log(
"info",
@@ -1426,7 +1441,9 @@ class Crew(FlowTrackable, BaseModel):
raise RuntimeError(f"{name} memory system is not initialized")
try:
reset_fn: Callable = cast(Callable, config.get("reset"))
reset_fn: Callable[..., None] = cast(
Callable[..., None], config.get("reset")
)
reset_fn(system)
self._logger.log(
"info",
@@ -1437,18 +1454,18 @@ class Crew(FlowTrackable, BaseModel):
f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {str(e)}"
) from e
def _get_memory_systems(self):
def _get_memory_systems(self) -> dict[str, dict[str, Any]]:
"""Get all available memory systems with their configuration.
Returns:
Dict containing all memory systems with their reset functions and display names.
"""
def default_reset(memory):
return memory.reset()
def default_reset(memory: Any) -> None:
memory.reset()
def knowledge_reset(memory):
return self.reset_knowledge(memory)
def knowledge_reset(memory: Any) -> None:
self.reset_knowledge(memory)
# Get knowledge for agents
agent_knowledges = [
@@ -1502,12 +1519,12 @@ class Crew(FlowTrackable, BaseModel):
},
}
def reset_knowledge(self, knowledges: List[Knowledge]) -> None:
def reset_knowledge(self, knowledges: list[Knowledge]) -> None:
"""Reset crew and agent knowledge storage."""
for ks in knowledges:
ks.reset()
def _set_allow_crewai_trigger_context_for_first_task(self):
def _set_allow_crewai_trigger_context_for_first_task(self) -> None:
crewai_trigger_payload = self._inputs and self._inputs.get(
"crewai_trigger_payload"
)

View File

@@ -0,0 +1,56 @@
"""CrewAI events system for monitoring and extending agent behavior.
This module provides the event infrastructure that allows users to:
- Monitor agent, task, and crew execution
- Track memory operations and performance
- Build custom logging and analytics
- Extend CrewAI with custom event handlers
"""
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemorySaveCompletedEvent,
MemorySaveStartedEvent,
MemoryQueryStartedEvent,
MemoryRetrievalCompletedEvent,
MemorySaveFailedEvent,
MemoryQueryFailedEvent,
)
from crewai.events.types.knowledge_events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
)
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
)
from crewai.events.types.llm_events import (
LLMStreamChunkEvent,
)
__all__ = [
"BaseEventListener",
"crewai_event_bus",
"MemoryQueryCompletedEvent",
"MemorySaveCompletedEvent",
"MemorySaveStartedEvent",
"MemoryQueryStartedEvent",
"MemoryRetrievalCompletedEvent",
"MemorySaveFailedEvent",
"MemoryQueryFailedEvent",
"KnowledgeRetrievalStartedEvent",
"KnowledgeRetrievalCompletedEvent",
"CrewKickoffStartedEvent",
"CrewKickoffCompletedEvent",
"AgentExecutionCompletedEvent",
"LLMStreamChunkEvent",
]

View File

@@ -0,0 +1,15 @@
from abc import ABC, abstractmethod
from crewai.events.event_bus import CrewAIEventsBus, crewai_event_bus
class BaseEventListener(ABC):
verbose: bool = False
def __init__(self) -> None:
super().__init__()
self.setup_listeners(crewai_event_bus)
@abstractmethod
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None:
pass

View File

@@ -11,7 +11,9 @@ class BaseEvent(BaseModel):
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"
source_type: Optional[str] = (
None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"
)
fingerprint_metadata: Optional[Dict[str, Any]] = None # Any relevant metadata
def to_json(self, exclude: set[str] | None = None):
@@ -25,3 +27,20 @@ class BaseEvent(BaseModel):
dict: A JSON-serializable dictionary.
"""
return to_serializable(self, exclude=exclude)
def _set_task_params(self, data: Dict[str, Any]):
if "from_task" in data and (task := data["from_task"]):
self.task_id = task.id
self.task_name = task.name or task.description
self.from_task = None
def _set_agent_params(self, data: Dict[str, Any]):
task = data.get("from_task", None)
agent = task.agent if task else data.get("from_agent", None)
if not agent:
return
self.agent_id = agent.id
self.agent_role = agent.role
self.from_agent = None

View File

@@ -0,0 +1,115 @@
from __future__ import annotations
import threading
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from typing import Any, ParamSpec, TypeVar, cast
from blinker import Signal
from typing_extensions import Self
from crewai.events.base_events import BaseEvent
EventT = TypeVar("EventT", bound=BaseEvent)
P = ParamSpec("P")
class CrewAIEventsBus:
"""
A singleton event bus that uses blinker signals for event handling.
Allows both internal (Flow/Crew) and external event handling.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls) -> Self:
if cls._instance is None:
with cls._lock:
if cls._instance is None: # prevent race condition
cls._instance = super().__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self) -> None:
"""Initialize the event bus internal state"""
self._signal = Signal("crewai_event_bus")
self._handlers: dict[type[BaseEvent], list[Callable[[Any, Any], None]]] = {}
def on(
self, event_type: type[EventT]
) -> Callable[[Callable[[Any, EventT], None]], Callable[[Any, EventT], None]]:
"""
Decorator to register an event handler for a specific event type.
Usage:
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(
source: Any, event: AgentExecutionCompletedEvent
):
print(f"👍 Agent '{event.agent}' completed task")
print(f" Output: {event.output}")
"""
def decorator(
handler: Callable[[Any, EventT], None],
) -> Callable[[Any, EventT], None]:
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(cast(Callable[[Any, Any], None], handler))
return handler
return decorator
def emit(self, source: Any, event: BaseEvent) -> None:
"""
Emit an event to all registered handlers
Args:
source: The object emitting the event
event: The event instance to emit
"""
for event_type, handlers in self._handlers.items():
if isinstance(event, event_type):
for handler in handlers:
try:
handler(source, event)
except Exception as e:
print(
f"[EventBus Error] Handler '{handler.__name__}' failed for event '{event_type.__name__}': {e}"
)
self._signal.send(source, event=event)
def register_handler(
self, event_type: type[BaseEvent], handler: Callable[[Any, Any], None]
) -> None:
"""Register an event handler for a specific event type"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
@contextmanager
def scoped_handlers(self) -> Iterator[None]:
"""
Context manager for temporary event handling scope.
Useful for testing or temporary event handling.
Usage:
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStarted)
def temp_handler(source, event):
print("Temporary handler")
# Do stuff...
# Handlers are cleared after the context
"""
previous_handlers = self._handlers.copy()
self._handlers.clear()
try:
yield
finally:
self._handlers = previous_handlers
# Global instance
crewai_event_bus = CrewAIEventsBus()

View File

@@ -1,43 +1,21 @@
from __future__ import annotations
from io import StringIO
from typing import Any, Dict
from typing import Any
from pydantic import Field, PrivateAttr
from crewai.llm import LLM
from crewai.task import Task
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.constants import EMITTER_COLOR
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
)
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from crewai.utilities.events.llm_guardrail_events import (
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
)
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from typing_extensions import Self
from .agent_events import (
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.event_bus import CrewAIEventsBus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
AgentLogsStartedEvent,
AgentLogsExecutionEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from .crew_events import (
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
@@ -49,7 +27,37 @@ from .crew_events import (
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from .flow_events import (
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeQueryStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeSearchQueryFailedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.llm import LLM
from crewai.task import Task
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.constants import EMITTER_COLOR
from .listeners.memory_listener import MemoryListener
from .types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowStartedEvent,
@@ -57,38 +65,37 @@ from .flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from .task_events import TaskCompletedEvent, TaskFailedEvent, TaskStartedEvent
from .tool_usage_events import (
from .types.reasoning_events import (
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
AgentReasoningStartedEvent,
)
from .types.task_events import TaskCompletedEvent, TaskFailedEvent, TaskStartedEvent
from .types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from .reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
from .listeners.memory_listener import MemoryListener
class EventListener(BaseEventListener):
_instance = None
_initialized: bool = False
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
execution_spans: dict[Task, Any] = Field(default_factory=dict)
next_chunk = 0
text_stream = StringIO()
knowledge_retrieval_in_progress = False
knowledge_query_in_progress = False
def __new__(cls):
def __new__(cls) -> Self:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
def __init__(self) -> None:
if not hasattr(self, "_initialized") or not self._initialized:
super().__init__()
self._telemetry = Telemetry()
@@ -101,14 +108,14 @@ class EventListener(BaseEventListener):
# ----------- CREW EVENTS -----------
def setup_listeners(self, crewai_event_bus):
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None:
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent):
def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
self.formatter.create_crew_tree(event.crew_name or "Crew", source.id)
self._telemetry.crew_execution_span(source, event.inputs)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event: CrewKickoffCompletedEvent):
def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
# Handle telemetry
final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output)
@@ -122,7 +129,7 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent):
def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
self.formatter.update_crew_tree(
self.formatter.current_crew_tree,
event.crew_name or "Crew",
@@ -131,23 +138,25 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent):
def on_crew_train_started(source: Any, event: CrewTrainStartedEvent) -> None:
self.formatter.handle_crew_train_started(
event.crew_name or "Crew", str(event.timestamp)
)
@crewai_event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent):
def on_crew_train_completed(
source: Any, event: CrewTrainCompletedEvent
) -> None:
self.formatter.handle_crew_train_completed(
event.crew_name or "Crew", str(event.timestamp)
)
@crewai_event_bus.on(CrewTrainFailedEvent)
def on_crew_train_failed(source, event: CrewTrainFailedEvent):
def on_crew_train_failed(source: Any, event: CrewTrainFailedEvent) -> None:
self.formatter.handle_crew_train_failed(event.crew_name or "Crew")
@crewai_event_bus.on(CrewTestResultEvent)
def on_crew_test_result(source, event: CrewTestResultEvent):
def on_crew_test_result(source: Any, event: CrewTestResultEvent) -> None:
self._telemetry.individual_test_result_span(
source.crew,
event.quality,
@@ -158,17 +167,17 @@ class EventListener(BaseEventListener):
# ----------- TASK EVENTS -----------
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
def on_task_started(source: Any, event: TaskStartedEvent) -> None:
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span
# Pass both task ID and task name (if set)
task_name = source.name if hasattr(source, 'name') and source.name else None
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.create_task_branch(
self.formatter.current_crew_tree, source.id, task_name
)
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
def on_task_completed(source: Any, event: TaskCompletedEvent) -> None:
# Handle telemetry
span = self.execution_spans.get(source)
if span:
@@ -176,17 +185,17 @@ class EventListener(BaseEventListener):
self.execution_spans[source] = None
# Pass task name if it exists
task_name = source.name if hasattr(source, 'name') and source.name else None
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.update_task_status(
self.formatter.current_crew_tree,
source.id,
source.agent.role,
"completed",
task_name
task_name,
)
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
def on_task_failed(source: Any, event: TaskFailedEvent) -> None:
span = self.execution_spans.get(source)
if span:
if source.agent and source.agent.crew:
@@ -194,19 +203,21 @@ class EventListener(BaseEventListener):
self.execution_spans[source] = None
# Pass task name if it exists
task_name = source.name if hasattr(source, 'name') and source.name else None
task_name = source.name if hasattr(source, "name") and source.name else None
self.formatter.update_task_status(
self.formatter.current_crew_tree,
source.id,
source.agent.role,
"failed",
task_name
task_name,
)
# ----------- AGENT EVENTS -----------
@crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
def on_agent_execution_started(
source: Any, event: AgentExecutionStartedEvent
) -> None:
self.formatter.create_agent_branch(
self.formatter.current_task_branch,
event.agent.role,
@@ -214,7 +225,9 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
def on_agent_execution_completed(
source: Any, event: AgentExecutionCompletedEvent
) -> None:
self.formatter.update_agent_status(
self.formatter.current_agent_branch,
event.agent.role,
@@ -225,8 +238,8 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_execution_started(
source, event: LiteAgentExecutionStartedEvent
):
source: Any, event: LiteAgentExecutionStartedEvent
) -> None:
"""Handle LiteAgent execution started event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"], status="started", **event.agent_info
@@ -234,15 +247,17 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_execution_completed(
source, event: LiteAgentExecutionCompletedEvent
):
source: Any, event: LiteAgentExecutionCompletedEvent
) -> None:
"""Handle LiteAgent execution completed event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"], status="completed", **event.agent_info
)
@crewai_event_bus.on(LiteAgentExecutionErrorEvent)
def on_lite_agent_execution_error(source, event: LiteAgentExecutionErrorEvent):
def on_lite_agent_execution_error(
source: Any, event: LiteAgentExecutionErrorEvent
) -> None:
"""Handle LiteAgent execution error event."""
self.formatter.handle_lite_agent_execution(
event.agent_info["role"],
@@ -254,25 +269,27 @@ class EventListener(BaseEventListener):
# ----------- FLOW EVENTS -----------
@crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent):
def on_flow_created(source: Any, event: FlowCreatedEvent) -> None:
self._telemetry.flow_creation_span(event.flow_name)
self.formatter.create_flow_tree(event.flow_name, str(source.flow_id))
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
def on_flow_started(source: Any, event: FlowStartedEvent) -> None:
self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys())
)
self.formatter.start_flow(event.flow_name, str(source.flow_id))
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self.formatter.update_flow_status(
self.formatter.current_flow_tree, event.flow_name, source.flow_id
)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent):
def on_method_execution_started(
source: Any, event: MethodExecutionStartedEvent
) -> None:
self.formatter.update_method_status(
self.formatter.current_method_branch,
self.formatter.current_flow_tree,
@@ -281,7 +298,9 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
def on_method_execution_finished(
source: Any, event: MethodExecutionFinishedEvent
) -> None:
self.formatter.update_method_status(
self.formatter.current_method_branch,
self.formatter.current_flow_tree,
@@ -290,7 +309,9 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent):
def on_method_execution_failed(
source: Any, event: MethodExecutionFailedEvent
) -> None:
self.formatter.update_method_status(
self.formatter.current_method_branch,
self.formatter.current_flow_tree,
@@ -301,7 +322,7 @@ class EventListener(BaseEventListener):
# ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
def on_tool_usage_started(source: Any, event: ToolUsageStartedEvent) -> None:
if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_started(
event.tool_name,
@@ -315,7 +336,7 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
def on_tool_usage_finished(source: Any, event: ToolUsageFinishedEvent) -> None:
if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_finished(
event.tool_name,
@@ -328,7 +349,7 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
def on_tool_usage_error(source: Any, event: ToolUsageErrorEvent) -> None:
if isinstance(source, LLM):
self.formatter.handle_llm_tool_usage_error(
event.tool_name,
@@ -345,7 +366,7 @@ class EventListener(BaseEventListener):
# ----------- LLM EVENTS -----------
@crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
def on_llm_call_started(source: Any, event: LLMCallStartedEvent) -> None:
# Capture the returned tool branch and update the current_tool_branch reference
thinking_branch = self.formatter.handle_llm_call_started(
self.formatter.current_agent_branch,
@@ -356,7 +377,7 @@ class EventListener(BaseEventListener):
self.formatter.current_tool_branch = thinking_branch
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
def on_llm_call_completed(source: Any, event: LLMCallCompletedEvent) -> None:
self.formatter.handle_llm_call_completed(
self.formatter.current_tool_branch,
self.formatter.current_agent_branch,
@@ -364,7 +385,7 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event: LLMCallFailedEvent):
def on_llm_call_failed(source: Any, event: LLMCallFailedEvent) -> None:
self.formatter.handle_llm_call_failed(
self.formatter.current_tool_branch,
event.error,
@@ -372,7 +393,7 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_llm_stream_chunk(source, event: LLMStreamChunkEvent):
def on_llm_stream_chunk(source: Any, event: LLMStreamChunkEvent) -> None:
self.text_stream.write(event.chunk)
self.text_stream.seek(self.next_chunk)
@@ -385,7 +406,9 @@ class EventListener(BaseEventListener):
# ----------- LLM GUARDRAIL EVENTS -----------
@crewai_event_bus.on(LLMGuardrailStartedEvent)
def on_llm_guardrail_started(source, event: LLMGuardrailStartedEvent):
def on_llm_guardrail_started(
source: Any, event: LLMGuardrailStartedEvent
) -> None:
guardrail_str = str(event.guardrail)
guardrail_name = (
guardrail_str[:50] + "..." if len(guardrail_str) > 50 else guardrail_str
@@ -394,13 +417,15 @@ class EventListener(BaseEventListener):
self.formatter.handle_guardrail_started(guardrail_name, event.retry_count)
@crewai_event_bus.on(LLMGuardrailCompletedEvent)
def on_llm_guardrail_completed(source, event: LLMGuardrailCompletedEvent):
def on_llm_guardrail_completed(
source: Any, event: LLMGuardrailCompletedEvent
) -> None:
self.formatter.handle_guardrail_completed(
event.success, event.error, event.retry_count
)
@crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent):
def on_crew_test_started(source: Any, event: CrewTestStartedEvent) -> None:
cloned_crew = source.copy()
self._telemetry.test_execution_span(
cloned_crew,
@@ -414,20 +439,20 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent):
def on_crew_test_completed(source: Any, event: CrewTestCompletedEvent) -> None:
self.formatter.handle_crew_test_completed(
self.formatter.current_flow_tree,
event.crew_name or "Crew",
)
@crewai_event_bus.on(CrewTestFailedEvent)
def on_crew_test_failed(source, event: CrewTestFailedEvent):
def on_crew_test_failed(source: Any, event: CrewTestFailedEvent) -> None:
self.formatter.handle_crew_test_failed(event.crew_name or "Crew")
@crewai_event_bus.on(KnowledgeRetrievalStartedEvent)
def on_knowledge_retrieval_started(
source, event: KnowledgeRetrievalStartedEvent
):
source: Any, event: KnowledgeRetrievalStartedEvent
) -> None:
if self.knowledge_retrieval_in_progress:
return
@@ -440,8 +465,8 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(KnowledgeRetrievalCompletedEvent)
def on_knowledge_retrieval_completed(
source, event: KnowledgeRetrievalCompletedEvent
):
source: Any, event: KnowledgeRetrievalCompletedEvent
) -> None:
if not self.knowledge_retrieval_in_progress:
return
@@ -453,11 +478,15 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(KnowledgeQueryStartedEvent)
def on_knowledge_query_started(source, event: KnowledgeQueryStartedEvent):
def on_knowledge_query_started(
source: Any, event: KnowledgeQueryStartedEvent
) -> None:
pass
@crewai_event_bus.on(KnowledgeQueryFailedEvent)
def on_knowledge_query_failed(source, event: KnowledgeQueryFailedEvent):
def on_knowledge_query_failed(
source: Any, event: KnowledgeQueryFailedEvent
) -> None:
self.formatter.handle_knowledge_query_failed(
self.formatter.current_agent_branch,
event.error,
@@ -465,13 +494,15 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(KnowledgeQueryCompletedEvent)
def on_knowledge_query_completed(source, event: KnowledgeQueryCompletedEvent):
def on_knowledge_query_completed(
source: Any, event: KnowledgeQueryCompletedEvent
) -> None:
pass
@crewai_event_bus.on(KnowledgeSearchQueryFailedEvent)
def on_knowledge_search_query_failed(
source, event: KnowledgeSearchQueryFailedEvent
):
source: Any, event: KnowledgeSearchQueryFailedEvent
) -> None:
self.formatter.handle_knowledge_search_query_failed(
self.formatter.current_agent_branch,
event.error,
@@ -481,7 +512,9 @@ class EventListener(BaseEventListener):
# ----------- REASONING EVENTS -----------
@crewai_event_bus.on(AgentReasoningStartedEvent)
def on_agent_reasoning_started(source, event: AgentReasoningStartedEvent):
def on_agent_reasoning_started(
source: Any, event: AgentReasoningStartedEvent
) -> None:
self.formatter.handle_reasoning_started(
self.formatter.current_agent_branch,
event.attempt,
@@ -489,7 +522,9 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(AgentReasoningCompletedEvent)
def on_agent_reasoning_completed(source, event: AgentReasoningCompletedEvent):
def on_agent_reasoning_completed(
source: Any, event: AgentReasoningCompletedEvent
) -> None:
self.formatter.handle_reasoning_completed(
event.plan,
event.ready,
@@ -497,7 +532,9 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(AgentReasoningFailedEvent)
def on_agent_reasoning_failed(source, event: AgentReasoningFailedEvent):
def on_agent_reasoning_failed(
source: Any, event: AgentReasoningFailedEvent
) -> None:
self.formatter.handle_reasoning_failed(
event.error,
self.formatter.current_crew_tree,
@@ -506,7 +543,7 @@ class EventListener(BaseEventListener):
# ----------- AGENT LOGGING EVENTS -----------
@crewai_event_bus.on(AgentLogsStartedEvent)
def on_agent_logs_started(source, event: AgentLogsStartedEvent):
def on_agent_logs_started(source: Any, event: AgentLogsStartedEvent) -> None:
self.formatter.handle_agent_logs_started(
event.agent_role,
event.task_description,
@@ -514,7 +551,9 @@ class EventListener(BaseEventListener):
)
@crewai_event_bus.on(AgentLogsExecutionEvent)
def on_agent_logs_execution(source, event: AgentLogsExecutionEvent):
def on_agent_logs_execution(
source: Any, event: AgentLogsExecutionEvent
) -> None:
self.formatter.handle_agent_logs_execution(
event.agent_role,
event.formatted_answer,

View File

@@ -1,12 +1,12 @@
from typing import Union
from .agent_events import (
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
)
from .crew_events import (
from .types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
@@ -17,39 +17,39 @@ from .crew_events import (
CrewTrainFailedEvent,
CrewTrainStartedEvent,
)
from .flow_events import (
from .types.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from .llm_events import (
from .types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from .llm_guardrail_events import (
from .types.llm_guardrail_events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from .task_events import (
from .types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from .tool_usage_events import (
from .types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from .reasoning_events import (
from .types.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
from .knowledge_events import (
from .types.knowledge_events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeQueryStartedEvent,
@@ -58,7 +58,7 @@ from .knowledge_events import (
KnowledgeSearchQueryFailedEvent,
)
from .memory_events import (
from .types.memory_events import (
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,

View File

@@ -0,0 +1,5 @@
"""Event listener implementations for CrewAI.
This module contains various event listener implementations
for handling memory, tracing, and other event-driven functionality.
"""

View File

@@ -1,27 +1,30 @@
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.memory_events import (
from typing import Any
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.event_bus import CrewAIEventsBus
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemoryRetrievalCompletedEvent,
MemoryRetrievalStartedEvent,
MemoryQueryFailedEvent,
MemoryQueryCompletedEvent,
MemorySaveStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
class MemoryListener(BaseEventListener):
def __init__(self, formatter):
class MemoryListener(BaseEventListener):
def __init__(self, formatter: Any) -> None:
super().__init__()
self.formatter = formatter
self.memory_retrieval_in_progress = False
self.memory_save_in_progress = False
def setup_listeners(self, crewai_event_bus):
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None:
@crewai_event_bus.on(MemoryRetrievalStartedEvent)
def on_memory_retrieval_started(
source, event: MemoryRetrievalStartedEvent
):
source: Any, event: MemoryRetrievalStartedEvent
) -> None:
if self.memory_retrieval_in_progress:
return
@@ -34,8 +37,8 @@ class MemoryListener(BaseEventListener):
@crewai_event_bus.on(MemoryRetrievalCompletedEvent)
def on_memory_retrieval_completed(
source, event: MemoryRetrievalCompletedEvent
):
source: Any, event: MemoryRetrievalCompletedEvent
) -> None:
if not self.memory_retrieval_in_progress:
return
@@ -44,11 +47,13 @@ class MemoryListener(BaseEventListener):
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
event.memory_content,
event.retrieval_time_ms
event.retrieval_time_ms,
)
@crewai_event_bus.on(MemoryQueryCompletedEvent)
def on_memory_query_completed(source, event: MemoryQueryCompletedEvent):
def on_memory_query_completed(
source: Any, event: MemoryQueryCompletedEvent
) -> None:
if not self.memory_retrieval_in_progress:
return
@@ -60,7 +65,7 @@ class MemoryListener(BaseEventListener):
)
@crewai_event_bus.on(MemoryQueryFailedEvent)
def on_memory_query_failed(source, event: MemoryQueryFailedEvent):
def on_memory_query_failed(source: Any, event: MemoryQueryFailedEvent) -> None:
if not self.memory_retrieval_in_progress:
return
@@ -72,7 +77,7 @@ class MemoryListener(BaseEventListener):
)
@crewai_event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(source, event: MemorySaveStartedEvent):
def on_memory_save_started(source: Any, event: MemorySaveStartedEvent) -> None:
if self.memory_save_in_progress:
return
@@ -84,7 +89,9 @@ class MemoryListener(BaseEventListener):
)
@crewai_event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(source, event: MemorySaveCompletedEvent):
def on_memory_save_completed(
source: Any, event: MemorySaveCompletedEvent
) -> None:
if not self.memory_save_in_progress:
return
@@ -98,7 +105,7 @@ class MemoryListener(BaseEventListener):
)
@crewai_event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(source, event: MemorySaveFailedEvent):
def on_memory_save_failed(source: Any, event: MemorySaveFailedEvent) -> None:
if not self.memory_save_in_progress:
return
@@ -107,4 +114,4 @@ class MemoryListener(BaseEventListener):
event.error,
event.source_type,
self.formatter.current_crew_tree,
)
)

View File

@@ -11,7 +11,7 @@ from crewai.cli.plus_api import PlusAPI
from rich.console import Console
from rich.panel import Panel
from crewai.utilities.events.listeners.tracing.types import TraceEvent
from crewai.events.listeners.tracing.types import TraceEvent
from logging import getLogger
logger = getLogger(__name__)
@@ -41,18 +41,21 @@ class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering"""
is_current_batch_ephemeral: bool = False
trace_batch_id: Optional[str] = None
current_batch: Optional[TraceBatch] = None
event_buffer: List[TraceEvent] = []
execution_start_times: Dict[str, datetime] = {}
batch_owner_type: Optional[str] = None
batch_owner_id: Optional[str] = None
def __init__(self):
try:
self.plus_api = PlusAPI(api_key=get_auth_token())
self.plus_api = PlusAPI(
api_key=get_auth_token(),
)
except AuthError:
self.plus_api = PlusAPI(api_key="")
self.trace_batch_id: Optional[str] = None # Backend ID
self.current_batch: Optional[TraceBatch] = None
self.event_buffer: List[TraceEvent] = []
self.execution_start_times: Dict[str, datetime] = {}
def initialize_batch(
self,
user_context: Dict[str, str],
@@ -113,7 +116,13 @@ class TraceBatchManager:
else self.plus_api.initialize_trace_batch(payload)
)
if response.status_code == 201 or response.status_code == 200:
if response is None:
logger.warning(
"Trace batch initialization failed gracefully. Continuing without tracing."
)
return
if response.status_code in [201, 200]:
response_data = response.json()
self.trace_batch_id = (
response_data["trace_id"]
@@ -128,21 +137,23 @@ class TraceBatchManager:
)
console.print(panel)
else:
logger.error(
f"❌ Failed to initialize trace batch: {response.status_code} - {response.text}"
logger.warning(
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
)
except Exception as e:
logger.error(f"❌ Error initializing trace batch: {str(e)}")
logger.warning(
f"Error initializing trace batch: {str(e)}. Continuing without tracing."
)
def add_event(self, trace_event: TraceEvent):
"""Add event to buffer"""
self.event_buffer.append(trace_event)
def _send_events_to_backend(self):
"""Send buffered events to backend"""
def _send_events_to_backend(self) -> int:
"""Send buffered events to backend with graceful failure handling"""
if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
return
return 500
try:
payload = {
@@ -154,38 +165,48 @@ class TraceBatchManager:
},
}
if not self.trace_batch_id:
raise Exception("❌ Trace batch ID not found")
response = (
self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload)
if self.is_current_batch_ephemeral
else self.plus_api.send_trace_events(self.trace_batch_id, payload)
)
if response.status_code == 200 or response.status_code == 201:
if response is None:
logger.warning("Failed to send trace events. Events will be lost.")
return 500
if response.status_code in [200, 201]:
self.event_buffer.clear()
return 200
else:
logger.error(
f"Failed to send events: {response.status_code} - {response.text}"
logger.warning(
f"Failed to send events: {response.status_code}. Events will be lost."
)
return 500
except Exception as e:
logger.error(f"❌ Error sending events to backend: {str(e)}")
logger.warning(
f"Error sending events to backend: {str(e)}. Events will be lost."
)
return 500
def finalize_batch(self) -> Optional[TraceBatch]:
"""Finalize batch and return it for sending"""
if not self.current_batch:
return None
self.current_batch.events = self.event_buffer.copy()
if self.event_buffer:
self._send_events_to_backend()
events_sent_to_backend_status = self._send_events_to_backend()
if events_sent_to_backend_status == 500:
return None
self._finalize_backend_batch()
self.current_batch.events = self.event_buffer.copy()
finalized_batch = self.current_batch
self.batch_owner_type = None
self.batch_owner_id = None
self.current_batch = None
self.event_buffer.clear()
self.trace_batch_id = None

View File

@@ -1,73 +1,70 @@
import os
import uuid
from typing import Any, Optional
from typing import Dict, Any, Optional
from typing_extensions import Self
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.agent_events import (
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.event_bus import CrewAIEventsBus
from crewai.events.listeners.tracing.trace_batch_manager import TraceBatchManager
from crewai.events.listeners.tracing.types import TraceEvent
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
AgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.utilities.events.listeners.tracing.types import TraceEvent
from crewai.utilities.events.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
from crewai.utilities.events.crew_events import (
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
)
from crewai.utilities.events.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.llm_events import (
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from crewai.utilities.events.flow_events import (
FlowCreatedEvent,
FlowStartedEvent,
FlowFinishedEvent,
MethodExecutionStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionFailedEvent,
FlowPlotEvent,
)
from crewai.utilities.events.llm_guardrail_events import (
LLMGuardrailStartedEvent,
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.utilities.serialization import to_serializable
from .trace_batch_manager import TraceBatchManager
from crewai.utilities.events.memory_events import (
MemoryQueryStartedEvent,
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
MemorySaveStartedEvent,
MemoryQueryStartedEvent,
MemorySaveCompletedEvent,
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.events.types.reasoning_events import (
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
AgentReasoningStartedEvent,
)
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.serialization import to_serializable
class TraceCollectionListener(BaseEventListener):
@@ -75,12 +72,20 @@ class TraceCollectionListener(BaseEventListener):
Trace collection listener that orchestrates trace collection
"""
complex_events = ["task_started", "llm_call_started", "llm_call_completed"]
complex_events = [
"task_started",
"task_completed",
"llm_call_started",
"llm_call_completed",
"agent_execution_started",
"agent_execution_completed",
]
_instance = None
_initialized = False
_listeners_setup = False
def __new__(cls, batch_manager=None):
def __new__(cls, batch_manager: Optional[Any] = None) -> Self:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@@ -93,10 +98,11 @@ class TraceCollectionListener(BaseEventListener):
return
super().__init__()
self.batch_manager = batch_manager or TraceBatchManager()
self.batch_manager = batch_manager or TraceBatchManager() # type: ignore
self._initialized = True
def _check_authenticated(self) -> bool:
@staticmethod
def _check_authenticated() -> bool:
"""Check if tracing should be enabled"""
try:
res = bool(get_auth_token())
@@ -104,7 +110,8 @@ class TraceCollectionListener(BaseEventListener):
except AuthError:
return False
def _get_user_context(self) -> Dict[str, str]:
@staticmethod
def _get_user_context() -> dict[str, str]:
"""Extract user context for tracing"""
return {
"user_id": os.getenv("CREWAI_USER_ID", "anonymous"),
@@ -113,174 +120,181 @@ class TraceCollectionListener(BaseEventListener):
"trace_id": str(uuid.uuid4()),
}
def setup_listeners(self, crewai_event_bus):
def setup_listeners(self, crewai_event_bus: CrewAIEventsBus) -> None:
"""Setup event listeners - delegates to specific handlers"""
if self._listeners_setup:
return
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)
self._register_action_event_handlers(crewai_event_bus)
def _register_flow_event_handlers(self, event_bus):
self._listeners_setup = True
def _register_flow_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for flow events"""
@event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event):
def on_flow_created(source: Any, event: Any) -> None:
pass
@event_bus.on(FlowStartedEvent)
def on_flow_started(source, event):
def on_flow_started(source: Any, event: Any) -> None:
if not self.batch_manager.is_batch_initialized():
self._initialize_flow_batch(source, event)
self._handle_trace_event("flow_started", source, event)
@event_bus.on(MethodExecutionStartedEvent)
def on_method_started(source, event):
def on_method_started(source: Any, event: Any) -> None:
self._handle_trace_event("method_execution_started", source, event)
@event_bus.on(MethodExecutionFinishedEvent)
def on_method_finished(source, event):
def on_method_finished(source: Any, event: Any) -> None:
self._handle_trace_event("method_execution_finished", source, event)
@event_bus.on(MethodExecutionFailedEvent)
def on_method_failed(source, event):
def on_method_failed(source: Any, event: Any) -> None:
self._handle_trace_event("method_execution_failed", source, event)
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event):
def on_flow_finished(source: Any, event: Any) -> None:
self._handle_trace_event("flow_finished", source, event)
self.batch_manager.finalize_batch()
if self.batch_manager.batch_owner_type == "flow":
self.batch_manager.finalize_batch()
@event_bus.on(FlowPlotEvent)
def on_flow_plot(source, event):
def on_flow_plot(source: Any, event: Any) -> None:
self._handle_action_event("flow_plot", source, event)
def _register_context_event_handlers(self, event_bus):
def _register_context_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for context events (start/end)"""
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
def on_crew_started(source: Any, event: Any) -> None:
if not self.batch_manager.is_batch_initialized():
self._initialize_crew_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
def on_crew_completed(source: Any, event: Any) -> None:
self._handle_trace_event("crew_kickoff_completed", source, event)
self.batch_manager.finalize_batch()
if self.batch_manager.batch_owner_type == "crew":
self.batch_manager.finalize_batch()
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event):
def on_crew_failed(source: Any, event: Any) -> None:
self._handle_trace_event("crew_kickoff_failed", source, event)
self.batch_manager.finalize_batch()
@event_bus.on(TaskStartedEvent)
def on_task_started(source, event):
def on_task_started(source: Any, event: Any) -> None:
self._handle_trace_event("task_started", source, event)
@event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event):
def on_task_completed(source: Any, event: Any) -> None:
self._handle_trace_event("task_completed", source, event)
@event_bus.on(TaskFailedEvent)
def on_task_failed(source, event):
def on_task_failed(source: Any, event: Any) -> None:
self._handle_trace_event("task_failed", source, event)
@event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event):
def on_agent_started(source: Any, event: Any) -> None:
self._handle_trace_event("agent_execution_started", source, event)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event):
def on_agent_completed(source: Any, event: Any) -> None:
self._handle_trace_event("agent_execution_completed", source, event)
@event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_started(source, event):
def on_lite_agent_started(source: Any, event: Any) -> None:
self._handle_trace_event("lite_agent_execution_started", source, event)
@event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_completed(source, event):
def on_lite_agent_completed(source: Any, event: Any) -> None:
self._handle_trace_event("lite_agent_execution_completed", source, event)
@event_bus.on(LiteAgentExecutionErrorEvent)
def on_lite_agent_error(source, event):
def on_lite_agent_error(source: Any, event: Any) -> None:
self._handle_trace_event("lite_agent_execution_error", source, event)
@event_bus.on(AgentExecutionErrorEvent)
def on_agent_error(source, event):
def on_agent_error(source: Any, event: Any) -> None:
self._handle_trace_event("agent_execution_error", source, event)
@event_bus.on(LLMGuardrailStartedEvent)
def on_guardrail_started(source, event):
def on_guardrail_started(source: Any, event: Any) -> None:
self._handle_trace_event("llm_guardrail_started", source, event)
@event_bus.on(LLMGuardrailCompletedEvent)
def on_guardrail_completed(source, event):
def on_guardrail_completed(source: Any, event: Any) -> None:
self._handle_trace_event("llm_guardrail_completed", source, event)
def _register_action_event_handlers(self, event_bus):
"""Register handlers for action events (LLM calls, tool usage, memory)"""
def _register_action_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for action events (LLM calls, tool usage)"""
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event):
def on_llm_call_started(source: Any, event: Any) -> None:
self._handle_action_event("llm_call_started", source, event)
@event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event):
def on_llm_call_completed(source: Any, event: Any) -> None:
self._handle_action_event("llm_call_completed", source, event)
@event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event):
def on_llm_call_failed(source: Any, event: Any) -> None:
self._handle_action_event("llm_call_failed", source, event)
@event_bus.on(ToolUsageStartedEvent)
def on_tool_started(source, event):
def on_tool_started(source: Any, event: Any) -> None:
self._handle_action_event("tool_usage_started", source, event)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_finished(source, event):
def on_tool_finished(source: Any, event: Any) -> None:
self._handle_action_event("tool_usage_finished", source, event)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_error(source, event):
def on_tool_error(source: Any, event: Any) -> None:
self._handle_action_event("tool_usage_error", source, event)
@event_bus.on(MemoryQueryStartedEvent)
def on_memory_query_started(source, event):
def on_memory_query_started(source: Any, event: Any) -> None:
self._handle_action_event("memory_query_started", source, event)
@event_bus.on(MemoryQueryCompletedEvent)
def on_memory_query_completed(source, event):
def on_memory_query_completed(source: Any, event: Any) -> None:
self._handle_action_event("memory_query_completed", source, event)
@event_bus.on(MemoryQueryFailedEvent)
def on_memory_query_failed(source, event):
def on_memory_query_failed(source: Any, event: Any) -> None:
self._handle_action_event("memory_query_failed", source, event)
@event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(source, event):
def on_memory_save_started(source: Any, event: Any) -> None:
self._handle_action_event("memory_save_started", source, event)
@event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(source, event):
def on_memory_save_completed(source: Any, event: Any) -> None:
self._handle_action_event("memory_save_completed", source, event)
@event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(source, event):
def on_memory_save_failed(source: Any, event: Any) -> None:
self._handle_action_event("memory_save_failed", source, event)
@event_bus.on(AgentReasoningStartedEvent)
def on_agent_reasoning_started(source, event):
def on_agent_reasoning_started(source: Any, event: Any) -> None:
self._handle_action_event("agent_reasoning_started", source, event)
@event_bus.on(AgentReasoningCompletedEvent)
def on_agent_reasoning_completed(source, event):
def on_agent_reasoning_completed(source: Any, event: Any) -> None:
self._handle_action_event("agent_reasoning_completed", source, event)
@event_bus.on(AgentReasoningFailedEvent)
def on_agent_reasoning_failed(source, event):
def on_agent_reasoning_failed(source: Any, event: Any) -> None:
self._handle_action_event("agent_reasoning_failed", source, event)
def _initialize_crew_batch(self, source: Any, event: Any):
def _initialize_crew_batch(self, source: Any, event: Any) -> None:
"""Initialize trace batch"""
user_context = self._get_user_context()
execution_metadata = {
@@ -289,9 +303,12 @@ class TraceCollectionListener(BaseEventListener):
"crewai_version": get_crewai_version(),
}
self.batch_manager.batch_owner_type = "crew"
self.batch_manager.batch_owner_id = getattr(source, "id", str(uuid.uuid4()))
self._initialize_batch(user_context, execution_metadata)
def _initialize_flow_batch(self, source: Any, event: Any):
def _initialize_flow_batch(self, source: Any, event: Any) -> None:
"""Initialize trace batch for Flow execution"""
user_context = self._get_user_context()
execution_metadata = {
@@ -301,29 +318,30 @@ class TraceCollectionListener(BaseEventListener):
"execution_type": "flow",
}
self.batch_manager.batch_owner_type = "flow"
self.batch_manager.batch_owner_id = getattr(source, "id", str(uuid.uuid4()))
self._initialize_batch(user_context, execution_metadata)
def _initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
):
self, user_context: dict[str, str], execution_metadata: dict[str, Any]
) -> None:
"""Initialize trace batch if ephemeral"""
if not self._check_authenticated():
self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=True
)
else:
self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=False
)
self.batch_manager.initialize_batch(user_context, execution_metadata)
def _handle_trace_event(self, event_type: str, source: Any, event: Any):
def _handle_trace_event(self, event_type: str, source: Any, event: Any) -> None:
"""Generic handler for context end events"""
trace_event = self._create_trace_event(event_type, source, event)
self.batch_manager.add_event(trace_event)
def _handle_action_event(self, event_type: str, source: Any, event: Any):
def _handle_action_event(self, event_type: str, source: Any, event: Any) -> None:
"""Generic handler for action events (LLM calls, tool usage)"""
if not self.batch_manager.is_batch_initialized():
@@ -350,7 +368,7 @@ class TraceCollectionListener(BaseEventListener):
def _build_event_data(
self, event_type: str, event: Any, source: Any
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Build event data"""
if event_type not in self.complex_events:
return self._safe_serialize_to_dict(event)
@@ -358,12 +376,44 @@ class TraceCollectionListener(BaseEventListener):
return {
"task_description": event.task.description,
"expected_output": event.task.expected_output,
"task_name": event.task.name,
"task_name": event.task.name or event.task.description,
"context": event.context,
"agent": source.agent.role,
"agent_role": source.agent.role,
"task_id": str(event.task.id),
}
elif event_type == "task_completed":
return {
"task_description": event.task.description if event.task else None,
"task_name": event.task.name or event.task.description
if event.task
else None,
"task_id": str(event.task.id) if event.task else None,
"output_raw": event.output.raw if event.output else None,
"output_format": str(event.output.output_format)
if event.output
else None,
"agent_role": event.output.agent if event.output else None,
}
elif event_type == "agent_execution_started":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
elif event_type == "agent_execution_completed":
return {
"agent_role": event.agent.role,
"agent_goal": event.agent.goal,
"agent_backstory": event.agent.backstory,
}
elif event_type == "llm_call_started":
return self._safe_serialize_to_dict(event)
event_data = self._safe_serialize_to_dict(event)
event_data["task_name"] = (
event.task_name or event.task_description
if hasattr(event, "task_name") and event.task_name
else None
)
return event_data
elif event_type == "llm_call_completed":
return self._safe_serialize_to_dict(event)
else:
@@ -373,11 +423,19 @@ class TraceCollectionListener(BaseEventListener):
"source": source,
}
# TODO: move to utils
@staticmethod
def _safe_serialize_to_dict(
self, obj, exclude: set[str] | None = None
) -> Dict[str, Any]:
"""Safely serialize an object to a dictionary for event data."""
obj: Any, exclude: set[str] | None = None
) -> dict[str, Any]:
"""Safely serialize an object to a dictionary for event data.
Args:
obj: The object to serialize.
exclude: Optional set of attribute names to exclude from serialization.
Notes:
- TODO: refactor to utilities function.
"""
try:
serialized = to_serializable(obj, exclude)
if isinstance(serialized, dict):
@@ -387,9 +445,20 @@ class TraceCollectionListener(BaseEventListener):
except Exception as e:
return {"serialization_error": str(e), "object_type": type(obj).__name__}
# TODO: move to utils
def _truncate_messages(self, messages, max_content_length=500, max_messages=5):
"""Truncate message content and limit number of messages"""
@staticmethod
def _truncate_messages(
messages: Any, max_content_length: int = 500, max_messages: int = 5
) -> Any:
"""Truncate message content and limit number of messages
Args:
messages: List of message dicts with 'content' keys.
max_content_length: Max length of each message content.
max_messages: Max number of messages to retain.
Notes:
- TODO: refactor to utilities function.
"""
if not messages or not isinstance(messages, list):
return messages

View File

@@ -0,0 +1,5 @@
"""Event type definitions for CrewAI.
This module contains all event types used throughout the CrewAI system
for monitoring and extending agent, crew, task, and tool execution.
"""

View File

@@ -1,13 +1,15 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
"""Agent-related events moved to break circular dependencies."""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Sequence, Union
from pydantic import model_validator
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from .base_events import BaseEvent
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.base_events import BaseEvent
class AgentExecutionStartedEvent(BaseEvent):
@@ -21,9 +23,9 @@ class AgentExecutionStartedEvent(BaseEvent):
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
@model_validator(mode="after")
def set_fingerprint_data(self):
"""Set fingerprint data from the agent if available."""
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
@@ -32,6 +34,7 @@ class AgentExecutionStartedEvent(BaseEvent):
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
return self
class AgentExecutionCompletedEvent(BaseEvent):
@@ -42,9 +45,11 @@ class AgentExecutionCompletedEvent(BaseEvent):
output: str
type: str = "agent_execution_completed"
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
model_config = {"arbitrary_types_allowed": True}
@model_validator(mode="after")
def set_fingerprint_data(self):
"""Set fingerprint data from the agent if available."""
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
@@ -53,6 +58,7 @@ class AgentExecutionCompletedEvent(BaseEvent):
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
return self
class AgentExecutionErrorEvent(BaseEvent):
@@ -63,9 +69,11 @@ class AgentExecutionErrorEvent(BaseEvent):
error: str
type: str = "agent_execution_error"
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
model_config = {"arbitrary_types_allowed": True}
@model_validator(mode="after")
def set_fingerprint_data(self):
"""Set fingerprint data from the agent if available."""
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
@@ -74,6 +82,7 @@ class AgentExecutionErrorEvent(BaseEvent):
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
return self
# New event classes for LiteAgent
@@ -104,26 +113,6 @@ class LiteAgentExecutionErrorEvent(BaseEvent):
type: str = "lite_agent_execution_error"
# New logging events
class AgentLogsStartedEvent(BaseEvent):
"""Event emitted when agent logs should be shown at start"""
agent_role: str
task_description: Optional[str] = None
verbose: bool = False
type: str = "agent_logs_started"
class AgentLogsExecutionEvent(BaseEvent):
"""Event emitted when agent logs should be shown during execution"""
agent_role: str
formatted_answer: Any
verbose: bool = False
type: str = "agent_logs_execution"
model_config = {"arbitrary_types_allowed": True}
# Agent Eval events
class AgentEvaluationStartedEvent(BaseEvent):
agent_id: str
@@ -132,6 +121,7 @@ class AgentEvaluationStartedEvent(BaseEvent):
iteration: int
type: str = "agent_evaluation_started"
class AgentEvaluationCompletedEvent(BaseEvent):
agent_id: str
agent_role: str
@@ -141,6 +131,7 @@ class AgentEvaluationCompletedEvent(BaseEvent):
score: Any
type: str = "agent_evaluation_completed"
class AgentEvaluationFailedEvent(BaseEvent):
agent_id: str
agent_role: str

View File

@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from crewai.utilities.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent
if TYPE_CHECKING:
from crewai.crew import Crew

View File

@@ -2,7 +2,7 @@ from typing import Any, Dict, Optional, Union
from pydantic import BaseModel, ConfigDict
from .base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class FlowEvent(BaseEvent):

View File

@@ -1,10 +1,6 @@
from typing import TYPE_CHECKING, Any
from crewai.events.base_events import BaseEvent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.utilities.events.base_events import BaseEvent
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
class KnowledgeRetrievalStartedEvent(BaseEvent):
@@ -20,7 +16,7 @@ class KnowledgeRetrievalCompletedEvent(BaseEvent):
query: str
type: str = "knowledge_search_query_completed"
agent: BaseAgent
retrieved_knowledge: Any
retrieved_knowledge: str
class KnowledgeQueryStartedEvent(BaseEvent):

View File

@@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
from crewai.utilities.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class LLMEventBase(BaseEvent):
@@ -13,26 +13,14 @@ class LLMEventBase(BaseEvent):
agent_id: Optional[str] = None
agent_role: Optional[str] = None
from_task: Optional[Any] = None
from_agent: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
self._set_agent_params(data)
self._set_task_params(data)
def _set_agent_params(self, data: Dict[str, Any]):
task = data.get("from_task", None)
agent = task.agent if task else data.get("from_agent", None)
if not agent:
return
self.agent_id = agent.id
self.agent_role = agent.role
def _set_task_params(self, data: Dict[str, Any]):
if "from_task" in data and (task := data["from_task"]):
self.task_id = task.id
self.task_name = task.name
class LLMCallType(Enum):
"""Type of LLM call being made"""

View File

@@ -1,7 +1,7 @@
from inspect import getsource
from typing import Any, Callable, Optional, Union
from crewai.utilities.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class LLMGuardrailStartedEvent(BaseEvent):

View File

@@ -0,0 +1,25 @@
"""Agent logging events that don't reference BaseAgent to avoid circular imports."""
from typing import Any, Optional
from crewai.events.base_events import BaseEvent
class AgentLogsStartedEvent(BaseEvent):
"""Event emitted when agent logs should be shown at start"""
agent_role: str
task_description: Optional[str] = None
verbose: bool = False
type: str = "agent_logs_started"
class AgentLogsExecutionEvent(BaseEvent):
"""Event emitted when agent logs should be shown during execution"""
agent_role: str
formatted_answer: Any
verbose: bool = False
type: str = "agent_logs_execution"
model_config = {"arbitrary_types_allowed": True}

View File

@@ -1,9 +1,26 @@
from typing import Any, Dict, Optional
from crewai.utilities.events.base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class MemoryQueryStartedEvent(BaseEvent):
class MemoryBaseEvent(BaseEvent):
"""Base event for memory operations"""
type: str
task_id: Optional[str] = None
task_name: Optional[str] = None
from_task: Optional[Any] = None
from_agent: Optional[Any] = None
agent_role: Optional[str] = None
agent_id: Optional[str] = None
def __init__(self, **data):
super().__init__(**data)
self._set_agent_params(data)
self._set_task_params(data)
class MemoryQueryStartedEvent(MemoryBaseEvent):
"""Event emitted when a memory query is started"""
type: str = "memory_query_started"
@@ -12,7 +29,7 @@ class MemoryQueryStartedEvent(BaseEvent):
score_threshold: Optional[float] = None
class MemoryQueryCompletedEvent(BaseEvent):
class MemoryQueryCompletedEvent(MemoryBaseEvent):
"""Event emitted when a memory query is completed successfully"""
type: str = "memory_query_completed"
@@ -23,7 +40,7 @@ class MemoryQueryCompletedEvent(BaseEvent):
query_time_ms: float
class MemoryQueryFailedEvent(BaseEvent):
class MemoryQueryFailedEvent(MemoryBaseEvent):
"""Event emitted when a memory query fails"""
type: str = "memory_query_failed"
@@ -33,7 +50,7 @@ class MemoryQueryFailedEvent(BaseEvent):
error: str
class MemorySaveStartedEvent(BaseEvent):
class MemorySaveStartedEvent(MemoryBaseEvent):
"""Event emitted when a memory save operation is started"""
type: str = "memory_save_started"
@@ -42,7 +59,7 @@ class MemorySaveStartedEvent(BaseEvent):
agent_role: Optional[str] = None
class MemorySaveCompletedEvent(BaseEvent):
class MemorySaveCompletedEvent(MemoryBaseEvent):
"""Event emitted when a memory save operation is completed successfully"""
type: str = "memory_save_completed"
@@ -52,7 +69,7 @@ class MemorySaveCompletedEvent(BaseEvent):
save_time_ms: float
class MemorySaveFailedEvent(BaseEvent):
class MemorySaveFailedEvent(MemoryBaseEvent):
"""Event emitted when a memory save operation fails"""
type: str = "memory_save_failed"
@@ -62,14 +79,14 @@ class MemorySaveFailedEvent(BaseEvent):
error: str
class MemoryRetrievalStartedEvent(BaseEvent):
class MemoryRetrievalStartedEvent(MemoryBaseEvent):
"""Event emitted when memory retrieval for a task prompt starts"""
type: str = "memory_retrieval_started"
task_id: Optional[str] = None
class MemoryRetrievalCompletedEvent(BaseEvent):
class MemoryRetrievalCompletedEvent(MemoryBaseEvent):
"""Event emitted when memory retrieval for a task prompt completes successfully"""
type: str = "memory_retrieval_completed"

View File

@@ -0,0 +1,47 @@
from crewai.events.base_events import BaseEvent
from typing import Any, Optional
class ReasoningEvent(BaseEvent):
"""Base event for reasoning events."""
type: str
attempt: int = 1
agent_role: str
task_id: str
task_name: Optional[str] = None
from_task: Optional[Any] = None
agent_id: Optional[str] = None
from_agent: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
self._set_task_params(data)
self._set_agent_params(data)
class AgentReasoningStartedEvent(ReasoningEvent):
"""Event emitted when an agent starts reasoning about a task."""
type: str = "agent_reasoning_started"
agent_role: str
task_id: str
class AgentReasoningCompletedEvent(ReasoningEvent):
"""Event emitted when an agent finishes its reasoning process."""
type: str = "agent_reasoning_completed"
agent_role: str
task_id: str
plan: str
ready: bool
class AgentReasoningFailedEvent(ReasoningEvent):
"""Event emitted when the reasoning process fails."""
type: str = "agent_reasoning_failed"
agent_role: str
task_id: str
error: str

View File

@@ -1,7 +1,7 @@
from typing import Any, Optional
from crewai.events.base_events import BaseEvent
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.events.base_events import BaseEvent
class TaskStartedEvent(BaseEvent):
@@ -11,14 +11,15 @@ class TaskStartedEvent(BaseEvent):
context: Optional[str]
task: Optional[Any] = None
def __init__(self, **data):
def __init__(self, **data: Any) -> None:
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
if self.task and hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
self.task
and hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
@@ -31,14 +32,15 @@ class TaskCompletedEvent(BaseEvent):
type: str = "task_completed"
task: Optional[Any] = None
def __init__(self, **data):
def __init__(self, **data: Any) -> None:
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
if self.task and hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
self.task
and hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
@@ -51,14 +53,15 @@ class TaskFailedEvent(BaseEvent):
type: str = "task_failed"
task: Optional[Any] = None
def __init__(self, **data):
def __init__(self, **data: Any) -> None:
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
if self.task and hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
self.task
and hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
@@ -71,14 +74,15 @@ class TaskEvaluationEvent(BaseEvent):
evaluation_type: str
task: Optional[Any] = None
def __init__(self, **data):
def __init__(self, **data: Any) -> None:
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
if self.task and hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
self.task
and hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata

View File

@@ -1,7 +1,7 @@
from datetime import datetime
from typing import Any, Callable, Dict, Optional
from .base_events import BaseEvent
from crewai.events.base_events import BaseEvent
class ToolUsageEvent(BaseEvent):
@@ -9,17 +9,24 @@ class ToolUsageEvent(BaseEvent):
agent_key: Optional[str] = None
agent_role: Optional[str] = None
agent_id: Optional[str] = None
tool_name: str
tool_args: Dict[str, Any] | str
tool_class: Optional[str] = None
run_attempts: int | None = None
delegations: int | None = None
agent: Optional[Any] = None
task_name: Optional[str] = None
task_id: Optional[str] = None
from_task: Optional[Any] = None
from_agent: Optional[Any] = None
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
self._set_agent_params(data)
self._set_task_params(data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str

View File

@@ -227,7 +227,7 @@ class ConsoleFormatter:
return None
task_content = Text()
# Display task name if available, otherwise just the ID
if task_name:
task_content.append("📋 Task: ", style="yellow bold")
@@ -235,7 +235,7 @@ class ConsoleFormatter:
task_content.append(f" (ID: {task_id})", style="yellow dim")
else:
task_content.append(f"📋 Task: {task_id}", style="yellow bold")
task_content.append("\nStatus: ", style="white")
task_content.append("Executing Task...", style="yellow dim")

View File

@@ -1,28 +1,42 @@
import threading
from typing import Any
from typing import Any, Optional
from crewai.experimental.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
from crewai.experimental.evaluation.base_evaluator import (
AgentEvaluationResult,
AggregationStrategy,
)
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from crewai.events.types.agent_events import (
AgentEvaluationStartedEvent,
AgentEvaluationCompletedEvent,
AgentEvaluationFailedEvent,
)
from crewai.experimental.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.events.task_events import TaskCompletedEvent
from crewai.utilities.events.agent_events import LiteAgentExecutionCompletedEvent
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, EvaluationScore, MetricCategory
from crewai.events.event_bus import crewai_event_bus
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.events.types.task_events import TaskCompletedEvent
from crewai.events.types.agent_events import LiteAgentExecutionCompletedEvent
from crewai.experimental.evaluation.base_evaluator import (
AgentAggregatedEvaluationResult,
EvaluationScore,
MetricCategory,
)
class ExecutionState:
current_agent_id: Optional[str] = None
current_task_id: Optional[str] = None
def __init__(self):
self.traces = {}
self.current_agent_id: str | None = None
self.current_task_id: str | None = None
self.iteration = 1
self.iterations_results = {}
self.agent_evaluators = {}
class AgentEvaluator:
def __init__(
self,
@@ -45,27 +59,45 @@ class AgentEvaluator:
@property
def _execution_state(self) -> ExecutionState:
if not hasattr(self._thread_local, 'execution_state'):
if not hasattr(self._thread_local, "execution_state"):
self._thread_local.execution_state = ExecutionState()
return self._thread_local.execution_state
def _subscribe_to_events(self) -> None:
from typing import cast
crewai_event_bus.register_handler(TaskCompletedEvent, cast(Any, self._handle_task_completed))
crewai_event_bus.register_handler(LiteAgentExecutionCompletedEvent, cast(Any, self._handle_lite_agent_completed))
crewai_event_bus.register_handler(
TaskCompletedEvent, cast(Any, self._handle_task_completed)
)
crewai_event_bus.register_handler(
LiteAgentExecutionCompletedEvent,
cast(Any, self._handle_lite_agent_completed),
)
def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None:
assert event.task is not None
agent = event.task.agent
if agent and str(getattr(agent, 'id', 'unknown')) in self._execution_state.agent_evaluators:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=str(event.task.id))
if (
agent
and str(getattr(agent, "id", "unknown"))
in self._execution_state.agent_evaluators
):
self.emit_evaluation_started_event(
agent_role=agent.role,
agent_id=str(agent.id),
task_id=str(event.task.id),
)
state = ExecutionState()
state.current_agent_id = str(agent.id)
state.current_task_id = str(event.task.id)
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
assert (
state.current_agent_id is not None and state.current_task_id is not None
)
trace = self.callback.get_trace(
state.current_agent_id, state.current_task_id
)
if not trace:
return
@@ -75,19 +107,28 @@ class AgentEvaluator:
task=event.task,
execution_trace=trace,
final_output=event.output,
state=state
state=state,
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
if agent.role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent.role] = []
if (
agent.role
not in self._execution_state.iterations_results[current_iteration]
):
self._execution_state.iterations_results[current_iteration][
agent.role
] = []
self._execution_state.iterations_results[current_iteration][agent.role].append(result)
self._execution_state.iterations_results[current_iteration][
agent.role
].append(result)
def _handle_lite_agent_completed(self, source: object, event: LiteAgentExecutionCompletedEvent) -> None:
def _handle_lite_agent_completed(
self, source: object, event: LiteAgentExecutionCompletedEvent
) -> None:
agent_info = event.agent_info
agent_id = str(agent_info["id"])
@@ -105,8 +146,12 @@ class AgentEvaluator:
if not target_agent:
return
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
assert (
state.current_agent_id is not None and state.current_task_id is not None
)
trace = self.callback.get_trace(
state.current_agent_id, state.current_task_id
)
if not trace:
return
@@ -115,7 +160,7 @@ class AgentEvaluator:
agent=target_agent,
execution_trace=trace,
final_output=event.output,
state=state
state=state,
)
current_iteration = self._execution_state.iteration
@@ -123,10 +168,17 @@ class AgentEvaluator:
self._execution_state.iterations_results[current_iteration] = {}
agent_role = target_agent.role
if agent_role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent_role] = []
if (
agent_role
not in self._execution_state.iterations_results[current_iteration]
):
self._execution_state.iterations_results[current_iteration][
agent_role
] = []
self._execution_state.iterations_results[current_iteration][agent_role].append(result)
self._execution_state.iterations_results[current_iteration][
agent_role
].append(result)
def set_iteration(self, iteration: int) -> None:
self._execution_state.iteration = iteration
@@ -135,14 +187,26 @@ class AgentEvaluator:
self._execution_state.iterations_results = {}
def get_evaluation_results(self) -> dict[str, list[AgentEvaluationResult]]:
if self._execution_state.iterations_results and self._execution_state.iteration in self._execution_state.iterations_results:
return self._execution_state.iterations_results[self._execution_state.iteration]
if (
self._execution_state.iterations_results
and self._execution_state.iteration
in self._execution_state.iterations_results
):
return self._execution_state.iterations_results[
self._execution_state.iteration
]
return {}
def display_results_with_iterations(self) -> None:
self.display_formatter.display_summary_results(self._execution_state.iterations_results)
self.display_formatter.display_summary_results(
self._execution_state.iterations_results
)
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = True) -> dict[str, AgentAggregatedEvaluationResult]:
def get_agent_evaluation(
self,
strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE,
include_evaluation_feedback: bool = True,
) -> dict[str, AgentAggregatedEvaluationResult]:
agent_results = {}
with crewai_event_bus.scoped_handlers():
task_results = self.get_evaluation_results()
@@ -156,13 +220,16 @@ class AgentEvaluator:
agent_id=agent_id,
agent_role=agent_role,
results=results,
strategy=strategy
strategy=strategy,
)
agent_results[agent_role] = aggregated_result
if self._execution_state.iterations_results and self._execution_state.iteration == max(self._execution_state.iterations_results.keys(), default=0):
if (
self._execution_state.iterations_results
and self._execution_state.iteration
== max(self._execution_state.iterations_results.keys(), default=0)
):
self.display_results_with_iterations()
if include_evaluation_feedback:
@@ -171,7 +238,9 @@ class AgentEvaluator:
return agent_results
def display_evaluation_with_feedback(self) -> None:
self.display_formatter.display_evaluation_with_feedback(self._execution_state.iterations_results)
self.display_formatter.display_evaluation_with_feedback(
self._execution_state.iterations_results
)
def evaluate(
self,
@@ -183,46 +252,91 @@ class AgentEvaluator:
) -> AgentEvaluationResult:
result = AgentEvaluationResult(
agent_id=state.current_agent_id or str(agent.id),
task_id=state.current_task_id or (str(task.id) if task else "unknown_task")
task_id=state.current_task_id or (str(task.id) if task else "unknown_task"),
)
assert self.evaluators is not None
task_id = str(task.id) if task else None
for evaluator in self.evaluators:
try:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id)
self.emit_evaluation_started_event(
agent_role=agent.role, agent_id=str(agent.id), task_id=task_id
)
score = evaluator.evaluate(
agent=agent,
task=task,
execution_trace=execution_trace,
final_output=final_output
final_output=final_output,
)
result.metrics[evaluator.metric_category] = score
self.emit_evaluation_completed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, metric_category=evaluator.metric_category, score=score)
self.emit_evaluation_completed_event(
agent_role=agent.role,
agent_id=str(agent.id),
task_id=task_id,
metric_category=evaluator.metric_category,
score=score,
)
except Exception as e:
self.emit_evaluation_failed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, error=str(e))
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
self.emit_evaluation_failed_event(
agent_role=agent.role,
agent_id=str(agent.id),
task_id=task_id,
error=str(e),
)
self.console_formatter.print(
f"Error in {evaluator.metric_category.value} evaluator: {str(e)}"
)
return result
def emit_evaluation_started_event(self, agent_role: str, agent_id: str, task_id: str | None = None):
def emit_evaluation_started_event(
self, agent_role: str, agent_id: str, task_id: str | None = None
):
crewai_event_bus.emit(
self,
AgentEvaluationStartedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration)
AgentEvaluationStartedEvent(
agent_role=agent_role,
agent_id=agent_id,
task_id=task_id,
iteration=self._execution_state.iteration,
),
)
def emit_evaluation_completed_event(self, agent_role: str, agent_id: str, task_id: str | None = None, metric_category: MetricCategory | None = None, score: EvaluationScore | None = None):
def emit_evaluation_completed_event(
self,
agent_role: str,
agent_id: str,
task_id: str | None = None,
metric_category: MetricCategory | None = None,
score: EvaluationScore | None = None,
):
crewai_event_bus.emit(
self,
AgentEvaluationCompletedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, metric_category=metric_category, score=score)
AgentEvaluationCompletedEvent(
agent_role=agent_role,
agent_id=agent_id,
task_id=task_id,
iteration=self._execution_state.iteration,
metric_category=metric_category,
score=score,
),
)
def emit_evaluation_failed_event(self, agent_role: str, agent_id: str, error: str, task_id: str | None = None):
def emit_evaluation_failed_event(
self, agent_role: str, agent_id: str, error: str, task_id: str | None = None
):
crewai_event_bus.emit(
self,
AgentEvaluationFailedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, error=error)
AgentEvaluationFailedEvent(
agent_role=agent_role,
agent_id=agent_id,
task_id=task_id,
iteration=self._execution_state.iteration,
error=error,
),
)
def create_default_evaluator(agents: list[Agent], llm: None = None):
from crewai.experimental.evaluation import (
GoalAlignmentEvaluator,
@@ -230,7 +344,7 @@ def create_default_evaluator(agents: list[Agent], llm: None = None):
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
ReasoningEfficiencyEvaluator,
)
evaluators = [

View File

@@ -1,14 +1,15 @@
import abc
import enum
from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.llms.base_llm import BaseLLM
from crewai.task import Task
from crewai.llm import BaseLLM
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.llm_utils import create_default_llm, create_llm
class MetricCategory(enum.Enum):
GOAL_ALIGNMENT = "goal_alignment"
@@ -18,8 +19,8 @@ class MetricCategory(enum.Enum):
PARAMETER_EXTRACTION = "parameter_extraction"
TOOL_INVOCATION = "tool_invocation"
def title(self):
return self.value.replace('_', ' ').title()
def title(self) -> str:
return self.value.replace("_", " ").title()
class EvaluationScore(BaseModel):
@@ -27,15 +28,13 @@ class EvaluationScore(BaseModel):
default=5.0,
description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable",
ge=0.0,
le=10.0
le=10.0,
)
feedback: str = Field(
default="",
description="Detailed feedback explaining the evaluation score"
default="", description="Detailed feedback explaining the evaluation score"
)
raw_response: str | None = Field(
default=None,
description="Raw response from the evaluator (e.g., LLM)"
default=None, description="Raw response from the evaluator (e.g., LLM)"
)
def __str__(self) -> str:
@@ -46,7 +45,9 @@ class EvaluationScore(BaseModel):
class BaseEvaluator(abc.ABC):
def __init__(self, llm: BaseLLM | None = None):
self.llm: BaseLLM | None = create_llm(llm)
self.llm: BaseLLM | None = (
create_llm(llm) if llm is not None else create_default_llm()
)
@property
@abc.abstractmethod
@@ -57,7 +58,7 @@ class BaseEvaluator(abc.ABC):
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
execution_trace: dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
@@ -67,9 +68,8 @@ class BaseEvaluator(abc.ABC):
class AgentEvaluationResult(BaseModel):
agent_id: str = Field(description="ID of the evaluated agent")
task_id: str = Field(description="ID of the task that was executed")
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Evaluation scores for each metric category"
metrics: dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict, description="Evaluation scores for each metric category"
)
@@ -81,33 +81,23 @@ class AggregationStrategy(Enum):
class AgentAggregatedEvaluationResult(BaseModel):
agent_id: str = Field(
default="",
description="ID of the agent"
)
agent_role: str = Field(
default="",
description="Role of the agent"
)
agent_id: str = Field(default="", description="ID of the agent")
agent_role: str = Field(default="", description="Role of the agent")
task_count: int = Field(
default=0,
description="Number of tasks included in this aggregation"
default=0, description="Number of tasks included in this aggregation"
)
aggregation_strategy: AggregationStrategy = Field(
default=AggregationStrategy.SIMPLE_AVERAGE,
description="Strategy used for aggregation"
description="Strategy used for aggregation",
)
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Aggregated metrics across all tasks"
metrics: dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict, description="Aggregated metrics across all tasks"
)
task_results: List[str] = Field(
default_factory=list,
description="IDs of tasks included in this aggregation"
task_results: list[str] = Field(
default_factory=list, description="IDs of tasks included in this aggregation"
)
overall_score: Optional[float] = Field(
default=None,
description="Overall score for this agent"
default=None, description="Overall score for this agent"
)
def __str__(self) -> str:
@@ -119,7 +109,7 @@ class AgentAggregatedEvaluationResult(BaseModel):
result += f"\n\n- {category.value.upper()}: {score.score}/10\n"
if score.feedback:
detailed_feedback = "\n ".join(score.feedback.split('\n'))
detailed_feedback = "\n ".join(score.feedback.split("\n"))
result += f" {detailed_feedback}\n"
return result
return result

View File

@@ -3,18 +3,28 @@ from typing import Dict, Any, List
from rich.table import Table
from rich.box import HEAVY_EDGE, ROUNDED
from collections.abc import Sequence
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory
from crewai.experimental.evaluation.base_evaluator import (
AgentAggregatedEvaluationResult,
AggregationStrategy,
AgentEvaluationResult,
MetricCategory,
)
from crewai.experimental.evaluation import EvaluationScore
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.llm_utils import create_llm
class EvaluationDisplayFormatter:
def __init__(self):
self.console_formatter = ConsoleFormatter()
def display_evaluation_with_feedback(self, iterations_results: Dict[int, Dict[str, List[Any]]]):
def display_evaluation_with_feedback(
self, iterations_results: Dict[int, Dict[str, List[Any]]]
):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
self.console_formatter.print(
"[yellow]No evaluation results to display[/yellow]"
)
return
all_agent_roles: set[str] = set()
@@ -22,7 +32,9 @@ class EvaluationDisplayFormatter:
all_agent_roles.update(iter_results.keys())
for agent_role in sorted(all_agent_roles):
self.console_formatter.print(f"\n[bold cyan]Agent: {agent_role}[/bold cyan]")
self.console_formatter.print(
f"\n[bold cyan]Agent: {agent_role}[/bold cyan]"
)
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
@@ -62,9 +74,7 @@ class EvaluationDisplayFormatter:
table.add_section()
table.add_row(
metric.title(),
score_text,
evaluation_score.feedback or ""
metric.title(), score_text, evaluation_score.feedback or ""
)
if aggregated_result.overall_score is not None:
@@ -82,19 +92,26 @@ class EvaluationDisplayFormatter:
table.add_row(
"Overall Score",
f"[{overall_color}]{overall_score:.1f}[/]",
"Overall agent evaluation score"
"Overall agent evaluation score",
)
self.console_formatter.print(table)
def display_summary_results(self, iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]]):
def display_summary_results(
self,
iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]],
):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
self.console_formatter.print(
"[yellow]No evaluation results to display[/yellow]"
)
return
self.console_formatter.print("\n")
table = Table(title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE)
table = Table(
title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE
)
table.add_column("Agent/Metric", style="cyan")
@@ -123,11 +140,14 @@ class EvaluationDisplayFormatter:
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
strategy=AggregationStrategy.SIMPLE_AVERAGE
strategy=AggregationStrategy.SIMPLE_AVERAGE,
)
valid_scores = [score.score for score in aggregated_result.metrics.values()
if score.score is not None]
valid_scores = [
score.score
for score in aggregated_result.metrics.values()
if score.score is not None
]
if valid_scores:
avg_score = sum(valid_scores) / len(valid_scores)
agent_scores_by_iteration[iter_num] = avg_score
@@ -137,7 +157,9 @@ class EvaluationDisplayFormatter:
if not agent_scores_by_iteration:
continue
avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(agent_scores_by_iteration)
avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(
agent_scores_by_iteration
)
row = [f"[bold]{agent_role}[/bold]"]
@@ -178,9 +200,13 @@ class EvaluationDisplayFormatter:
row = [f" - {metric.title()}"]
for iter_num in sorted(iterations_results.keys()):
if (iter_num in agent_metrics_by_iteration and
metric in agent_metrics_by_iteration[iter_num]):
metric_score = agent_metrics_by_iteration[iter_num][metric].score
if (
iter_num in agent_metrics_by_iteration
and metric in agent_metrics_by_iteration[iter_num]
):
metric_score = agent_metrics_by_iteration[iter_num][
metric
].score
if metric_score is not None:
metric_scores.append(metric_score)
if metric_score >= 8.0:
@@ -225,7 +251,9 @@ class EvaluationDisplayFormatter:
results: Sequence[AgentEvaluationResult],
strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE,
) -> AgentAggregatedEvaluationResult:
metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(list)
metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(
list
)
for result in results:
for metric_name, evaluation_score in result.metrics.items():
@@ -246,19 +274,20 @@ class EvaluationDisplayFormatter:
metric=category.title(),
feedbacks=feedbacks,
scores=[s.score for s in scores],
strategy=strategy
strategy=strategy,
)
else:
feedback_summary = feedbacks[0]
aggregated_metrics[category] = EvaluationScore(
score=avg_score,
feedback=feedback_summary
score=avg_score, feedback=feedback_summary
)
overall_score = None
if aggregated_metrics:
valid_scores = [m.score for m in aggregated_metrics.values() if m.score is not None]
valid_scores = [
m.score for m in aggregated_metrics.values() if m.score is not None
]
if valid_scores:
overall_score = sum(valid_scores) / len(valid_scores)
@@ -268,7 +297,7 @@ class EvaluationDisplayFormatter:
metrics=aggregated_metrics,
overall_score=overall_score,
task_count=len(results),
aggregation_strategy=strategy
aggregation_strategy=strategy,
)
def _summarize_feedbacks(
@@ -277,10 +306,12 @@ class EvaluationDisplayFormatter:
metric: str,
feedbacks: List[str],
scores: List[float | None],
strategy: AggregationStrategy
strategy: AggregationStrategy,
) -> str:
if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks):
return "\n\n".join([f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)])
return "\n\n".join(
[f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)]
)
try:
llm = create_llm()
@@ -290,20 +321,26 @@ class EvaluationDisplayFormatter:
if len(feedback) > 500:
feedback = feedback[:500] + "..."
score_text = f"{score:.1f}" if score is not None else "N/A"
formatted_feedbacks.append(f"Feedback #{i+1} (Score: {score_text}):\n{feedback}")
formatted_feedbacks.append(
f"Feedback #{i+1} (Score: {score_text}):\n{feedback}"
)
all_feedbacks = "\n\n" + "\n\n---\n\n".join(formatted_feedbacks)
strategy_guidance = ""
if strategy == AggregationStrategy.BEST_PERFORMANCE:
strategy_guidance = "Focus on the highest-scoring aspects and strengths demonstrated."
strategy_guidance = (
"Focus on the highest-scoring aspects and strengths demonstrated."
)
elif strategy == AggregationStrategy.WORST_PERFORMANCE:
strategy_guidance = "Focus on areas that need improvement and common issues across tasks."
else:
strategy_guidance = "Provide a balanced analysis of strengths and weaknesses across all tasks."
prompt = [
{"role": "system", "content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback.
{
"role": "system",
"content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback.
Your job is to synthesize multiple feedback points about the same metric across different tasks.
Create a concise, insightful summary that captures the key patterns and themes from all feedback.
@@ -315,14 +352,18 @@ class EvaluationDisplayFormatter:
3. Highlighting patterns across tasks
4. 150-250 words in length
The summary should be directly usable as final feedback for the agent's performance on this metric."""},
{"role": "user", "content": f"""I need a synthesized summary of the following feedback for:
The summary should be directly usable as final feedback for the agent's performance on this metric.""",
},
{
"role": "user",
"content": f"""I need a synthesized summary of the following feedback for:
Agent Role: {agent_role}
Metric: {metric.title()}
{all_feedbacks}
"""}
""",
},
]
assert llm is not None
response = llm.call(prompt)
@@ -330,4 +371,6 @@ class EvaluationDisplayFormatter:
return response
except Exception:
return "Synthesized from multiple tasks: " + "\n\n".join([f"- {fb[:500]}..." for fb in feedbacks])
return "Synthesized from multiple tasks: " + "\n\n".join(
[f"- {fb[:500]}..." for fb in feedbacks]
)

View File

@@ -5,25 +5,23 @@ from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.agent_events import (
from crewai.events.base_event_listener import BaseEventListener
from crewai.events.event_bus import CrewAIEventsBus
from crewai.events.types.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent
LiteAgentExecutionCompletedEvent,
)
from crewai.utilities.events.tool_usage_events import (
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolExecutionErrorEvent,
ToolSelectionErrorEvent,
ToolValidateInputErrorEvent
)
from crewai.utilities.events.llm_events import (
LLMCallStartedEvent,
LLMCallCompletedEvent
ToolValidateInputErrorEvent,
)
from crewai.events.types.llm_events import LLMCallStartedEvent, LLMCallCompletedEvent
class EvaluationTraceCallback(BaseEventListener):
"""Event listener for collecting execution traces for evaluation.
@@ -68,27 +66,49 @@ class EvaluationTraceCallback(BaseEventListener):
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_completed(source, event: ToolUsageFinishedEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True)
self.on_tool_use(
event.tool_name, event.tool_args, event.output, success=True
)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="usage_error")
self.on_tool_use(
event.tool_name,
event.tool_args,
event.error,
success=False,
error_type="usage_error",
)
@event_bus.on(ToolExecutionErrorEvent)
def on_tool_execution_error(source, event: ToolExecutionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="execution_error")
self.on_tool_use(
event.tool_name,
event.tool_args,
event.error,
success=False,
error_type="execution_error",
)
@event_bus.on(ToolSelectionErrorEvent)
def on_tool_selection_error(source, event: ToolSelectionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="selection_error")
self.on_tool_use(
event.tool_name,
event.tool_args,
event.error,
success=False,
error_type="selection_error",
)
@event_bus.on(ToolValidateInputErrorEvent)
def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="validation_error")
self.on_tool_use(
event.tool_name,
event.tool_args,
event.error,
success=False,
error_type="validation_error",
)
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
@@ -99,7 +119,7 @@ class EvaluationTraceCallback(BaseEventListener):
self.on_llm_call_end(event.messages, event.response)
def on_lite_agent_start(self, agent_info: dict[str, Any]):
self.current_agent_id = agent_info['id']
self.current_agent_id = agent_info["id"]
self.current_task_id = "lite_task"
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
@@ -110,7 +130,7 @@ class EvaluationTraceCallback(BaseEventListener):
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
final_output=None,
)
def _init_trace(self, trace_key: str, **kwargs: Any):
@@ -128,7 +148,7 @@ class EvaluationTraceCallback(BaseEventListener):
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
final_output=None,
)
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
@@ -151,8 +171,14 @@ class EvaluationTraceCallback(BaseEventListener):
self._reset_current()
def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
success: bool = True, error_type: str | None = None):
def on_tool_use(
self,
tool_name: str,
tool_args: dict[str, Any] | str,
result: Any,
success: bool = True,
error_type: str | None = None,
):
if not self.current_agent_id or not self.current_task_id:
return
@@ -163,7 +189,7 @@ class EvaluationTraceCallback(BaseEventListener):
"args": tool_args,
"result": result,
"success": success,
"timestamp": datetime.now()
"timestamp": datetime.now(),
}
# Add error information if applicable
@@ -173,7 +199,11 @@ class EvaluationTraceCallback(BaseEventListener):
self.traces[trace_key]["tool_uses"].append(tool_use)
def on_llm_call_start(self, messages: str | Sequence[dict[str, Any]] | None, tools: Sequence[dict[str, Any]] | None = None):
def on_llm_call_start(
self,
messages: str | Sequence[dict[str, Any]] | None,
tools: Sequence[dict[str, Any]] | None = None,
):
if not self.current_agent_id or not self.current_task_id:
return
@@ -186,10 +216,12 @@ class EvaluationTraceCallback(BaseEventListener):
"tools": tools,
"start_time": datetime.now(),
"response": None,
"end_time": None
"end_time": None,
}
def on_llm_call_end(self, messages: str | list[dict[str, Any]] | None, response: Any):
def on_llm_call_end(
self, messages: str | list[dict[str, Any]] | None, response: Any
):
if not self.current_agent_id or not self.current_task_id:
return
@@ -213,7 +245,7 @@ class EvaluationTraceCallback(BaseEventListener):
"response": response,
"start_time": start_time,
"end_time": current_time,
"total_tokens": total_tokens
"total_tokens": total_tokens,
}
self.traces[trace_key]["llm_calls"].append(llm_call)
@@ -227,7 +259,7 @@ class EvaluationTraceCallback(BaseEventListener):
def create_evaluation_callbacks() -> EvaluationTraceCallback:
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.events.event_bus import crewai_event_bus
callback = EvaluationTraceCallback()
callback.setup_listeners(crewai_event_bus)

View File

@@ -25,8 +25,8 @@ from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.types import FlowExecutionData
from crewai.flow.utils import get_possible_return_constants
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.flow_events import (
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowPlotEvent,
@@ -35,10 +35,10 @@ from crewai.utilities.events.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.utilities.events.listeners.tracing.trace_listener import (
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.events.listeners.tracing.utils import (
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled,
)
from crewai.utilities.printer import Printer
@@ -474,6 +474,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_outputs: List[Any] = [] # List to store all method outputs
self._completed_methods: Set[str] = set() # Track completed methods for reload
self._persistence: Optional[FlowPersistence] = persistence
self._is_execution_resuming: bool = False
# Initialize state with initial values
self._state = self._create_initial_state()
@@ -829,6 +830,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()
self._method_outputs.clear()
else:
# We're restoring from persistence, set the flag
self._is_execution_resuming = True
if inputs:
# Override the id in the state if it exists in inputs
@@ -880,6 +884,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
]
await asyncio.gather(*tasks)
# Clear the resumption flag after initial execution completes
self._is_execution_resuming = False
final_output = self._method_outputs[-1] if self._method_outputs else None
crewai_event_bus.emit(
@@ -916,19 +923,23 @@ class Flow(Generic[T], metaclass=FlowMeta):
- Automatically injects crewai_trigger_payload if available in flow inputs
"""
if start_method_name in self._completed_methods:
last_output = self._method_outputs[-1] if self._method_outputs else None
await self._execute_listeners(start_method_name, last_output)
return
if self._is_execution_resuming:
# During resumption, skip execution but continue listeners
last_output = self._method_outputs[-1] if self._method_outputs else None
await self._execute_listeners(start_method_name, last_output)
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(start_method_name)
method = self._methods[start_method_name]
enhanced_method = self._inject_trigger_payload_for_start_method(method)
result = await self._execute_method(
start_method_name, enhanced_method
)
result = await self._execute_method(start_method_name, enhanced_method)
await self._execute_listeners(start_method_name, result)
def _inject_trigger_payload_for_start_method(self, original_method: Callable) -> Callable:
def _inject_trigger_payload_for_start_method(
self, original_method: Callable
) -> Callable:
def prepare_kwargs(*args, **kwargs):
inputs = baggage.get_baggage("flow_inputs") or {}
trigger_payload = inputs.get("crewai_trigger_payload")
@@ -941,15 +952,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
elif trigger_payload is not None:
self._log_flow_event(
f"Trigger payload available but {original_method.__name__} doesn't accept crewai_trigger_payload parameter",
color="yellow"
color="yellow",
)
return args, kwargs
if asyncio.iscoroutinefunction(original_method):
async def enhanced_method(*args, **kwargs):
args, kwargs = prepare_kwargs(*args, **kwargs)
return await original_method(*args, **kwargs)
else:
def enhanced_method(*args, **kwargs):
args, kwargs = prepare_kwargs(*args, **kwargs)
return original_method(*args, **kwargs)
@@ -1050,11 +1063,15 @@ class Flow(Generic[T], metaclass=FlowMeta):
for router_name in routers_triggered:
await self._execute_single_listener(router_name, result)
# After executing router, the router's result is the path
router_result = self._method_outputs[-1]
router_result = (
self._method_outputs[-1] if self._method_outputs else None
)
if router_result: # Only add non-None results
router_results.append(router_result)
current_trigger = (
router_result # Update for next iteration of router chain
str(router_result)
if router_result is not None
else "" # Update for next iteration of router chain
)
# Now execute normal listeners for all router results and the original trigger
@@ -1072,6 +1089,24 @@ class Flow(Generic[T], metaclass=FlowMeta):
]
await asyncio.gather(*tasks)
if current_trigger in router_results:
# Find start methods triggered by this router result
for method_name in self._start_methods:
# Check if this start method is triggered by the current trigger
if method_name in self._listeners:
condition_type, trigger_methods = self._listeners[
method_name
]
if current_trigger in trigger_methods:
# Only execute if this is a cycle (method was already completed)
if method_name in self._completed_methods:
# For router-triggered start methods in cycles, temporarily clear resumption flag
# to allow cyclic execution
was_resuming = self._is_execution_resuming
self._is_execution_resuming = False
await self._execute_start_method(method_name)
self._is_execution_resuming = was_resuming
def _find_triggered_methods(
self, trigger_method: str, router_only: bool
) -> List[str]:
@@ -1109,6 +1144,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
if router_only != is_router:
continue
if not router_only and listener_name in self._start_methods:
continue
if condition_type == "OR":
# If the trigger_method matches any in methods, run this
if trigger_method in methods:
@@ -1158,10 +1196,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
Catches and logs any exceptions during execution, preventing
individual listener failures from breaking the entire flow.
"""
# TODO: greyson fix
# if listener_name in self._completed_methods:
# await self._execute_listeners(listener_name, None)
# return
if listener_name in self._completed_methods:
if self._is_execution_resuming:
# During resumption, skip execution but continue listeners
await self._execute_listeners(listener_name, None)
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(listener_name)
try:
method = self._methods[listener_name]

View File

@@ -1,5 +1,5 @@
import os
from typing import Any, Dict, List, Optional
from typing import Any, Optional
from pydantic import BaseModel, ConfigDict, Field
@@ -18,20 +18,20 @@ class Knowledge(BaseModel):
embedder: Optional[Dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
sources: list[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder: Optional[Dict[str, Any]] = None
embedder: Optional[dict[str, Any]] = None
collection_name: Optional[str] = None
def __init__(
self,
collection_name: str,
sources: List[BaseKnowledgeSource],
embedder: Optional[Dict[str, Any]] = None,
sources: list[BaseKnowledgeSource],
embedder: Optional[dict[str, Any]] = None,
storage: Optional[KnowledgeStorage] = None,
**data,
):
**data: Any,
) -> None:
super().__init__(**data)
if storage:
self.storage = storage
@@ -43,8 +43,8 @@ class Knowledge(BaseModel):
self.storage.initialize_knowledge_storage()
def query(
self, query: List[str], results_limit: int = 3, score_threshold: float = 0.35
) -> List[Dict[str, Any]]:
self, query: list[str], results_limit: int = 3, score_threshold: float = 0.35
) -> list[dict[str, Any]]:
"""
Query across all knowledge sources to find the most relevant information.
Returns the top_k most relevant chunks.
@@ -62,7 +62,7 @@ class Knowledge(BaseModel):
)
return results
def add_sources(self):
def add_sources(self) -> None:
try:
for source in self.sources:
source.storage = self.storage

Some files were not shown because too many files have changed in this diff Show More