Compare commits

...

53 Commits

Author SHA1 Message Date
Lorenze Jay
7c162411b7 chore: update crewai to 0.157.0 and crewai-tools dependency to version 0.60.0 (#3281)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* chore: update crewai-tools dependency to version 0.60.0

- Updated the `pyproject.toml` and `uv.lock` files to reflect the new version of `crewai-tools`.
- This change ensures compatibility with the latest features and improvements in the tools package.

* chore: bump CrewAI version to 0.157.0

- Updated the version in `__init__.py` to reflect the new release.
- Adjusted dependency versions in `pyproject.toml` files for crew, flow, and tool templates to ensure compatibility with the latest features and improvements in CrewAI.
- This change maintains consistency across the project and prepares for upcoming enhancements.
2025-08-06 14:47:50 -07:00
Lorenze Jay
8f4a6cc61c Lorenze/tracing v1 (#3279)
* initial setup

* feat: enhance CrewKickoffCompletedEvent to include total token usage

- Added total_tokens attribute to CrewKickoffCompletedEvent for better tracking of token usage during crew execution.
- Updated Crew class to emit total token usage upon kickoff completion.
- Removed obsolete context handler and execution context tracker files to streamline event handling.

* cleanup

* remove print statements for loggers

* feat: add CrewAI base URL and improve logging in tracing

- Introduced `CREWAI_BASE_URL` constant for easy access to the CrewAI application URL.
- Replaced print statements with logging in the `TraceSender` class for better error tracking.
- Enhanced the `TraceBatchManager` to provide default values for flow names and removed unnecessary comments.
- Implemented singleton pattern in `TraceCollectionListener` to ensure a single instance is used.
- Added a new test case to verify that the trace listener correctly collects events during crew execution.

* clear

* fix: update datetime serialization in tracing interfaces

- Removed the 'Z' suffix from datetime serialization in TraceSender and TraceEvent to ensure consistent ISO format.
- Added new test cases to validate the functionality of the TraceBatchManager and event collection during crew execution.
- Introduced fixtures to clear event bus listeners before each test to maintain isolation.

* test: enhance tracing tests with mock authentication token

- Added a mock authentication token to the tracing tests to ensure proper setup and event collection.
- Updated test methods to include the mock token, improving isolation and reliability of tests related to the TraceListener and BatchManager.
- Ensured that the tests validate the correct behavior of event collection during crew execution.

* test: refactor tracing tests to improve mock usage

- Moved the mock authentication token patching inside the test class to enhance readability and maintainability.
- Updated test methods to remove unnecessary mock parameters, streamlining the test signatures.
- Ensured that the tests continue to validate the correct behavior of event collection during crew execution while improving isolation.

* test: refactor tracing tests for improved mock usage and consistency

- Moved mock authentication token patching into individual test methods for better clarity and maintainability.
- Corrected the backstory string in the `Agent` instantiation to fix a typo.
- Ensured that all tests validate the correct behavior of event collection during crew execution while enhancing isolation and readability.

* test: add new tracing test for disabled trace listener

- Introduced a new test case to verify that the trace listener does not make HTTP calls when tracing is disabled via environment variables.
- Enhanced existing tests by mocking PlusAPI HTTP calls to avoid authentication and network requests, improving test isolation and reliability.
- Updated the test setup to ensure proper initialization of the trace listener and its components during crew execution.

* refactor: update LLM class to utilize new completion function and improve cost calculation

- Replaced direct calls to `litellm.completion` with a new import for better clarity and maintainability.
- Introduced a new optional attribute `completion_cost` in the LLM class to track the cost of completions.
- Updated the handling of completion responses to ensure accurate cost calculations and improved error handling.
- Removed outdated test cassettes for gemini models to streamline test suite and avoid redundancy.
- Enhanced existing tests to reflect changes in the LLM class and ensure proper functionality.

* test: enhance tracing tests with additional request and response scenarios

- Added new test cases to validate the behavior of the trace listener and batch manager when handling 404 responses from the tracing API.
- Updated existing test cassettes to include detailed request and response structures, ensuring comprehensive coverage of edge cases.
- Improved mock setup to avoid unnecessary network calls and enhance test reliability.
- Ensured that the tests validate the correct behavior of event collection during crew execution, particularly in scenarios where the tracing service is unavailable.

* feat: enable conditional tracing based on environment variable

- Added support for enabling or disabling the trace listener based on the `CREWAI_TRACING_ENABLED` environment variable.
- Updated the `Crew` class to conditionally set up the trace listener only when tracing is enabled, improving performance and resource management.
- Refactored test cases to ensure proper cleanup of event bus listeners before and after each test, enhancing test reliability and isolation.
- Improved mock setup in tracing tests to validate the behavior of the trace listener when tracing is disabled.

* fix: downgrade litellm version from 1.74.9 to 1.74.3

- Updated the `pyproject.toml` and `uv.lock` files to reflect the change in the `litellm` dependency version.
- This downgrade addresses compatibility issues and ensures stability in the project environment.

* refactor: improve tracing test setup by moving mock authentication token patching

- Removed the module-level patch for the authentication token and implemented a fixture to mock the token for all tests in the class, enhancing test isolation and readability.
- Updated the event bus clearing logic to ensure original handlers are restored after tests, improving reliability of the test environment.
- This refactor streamlines the test setup and ensures consistent behavior across tracing tests.

* test: enhance tracing test setup with comprehensive mock authentication

- Expanded the mock authentication token patching to cover all instances where `get_auth_token` is used across different modules, ensuring consistent behavior in tests.
- Introduced a new fixture to reset tracing singleton instances between tests, improving test isolation and reliability.
- This update enhances the overall robustness of the tracing tests by ensuring that all necessary components are properly mocked and reset, leading to more reliable test outcomes.

* just drop the test for now

* refactor: comment out completion-related code in LLM and LLM event classes

- Commented out the `completion` and `completion_cost` imports and their usage in the `LLM` class to prevent potential issues during execution.
- Updated the `LLMCallCompletedEvent` class to comment out the `response_cost` attribute, ensuring consistency with the changes in the LLM class.
- This refactor aims to streamline the code and prepare for future updates without affecting current functionality.

* refactor: update LLM response handling in LiteAgent

- Commented out the `response_cost` attribute in the LLM response handling to align with recent refactoring in the LLM class.
- This change aims to maintain consistency in the codebase and prepare for future updates without affecting current functionality.

* refactor: remove commented-out response cost attributes in LLM and LiteAgent

- Commented out the `response_cost` attribute in both the `LiteAgent` and `LLM` classes to maintain consistency with recent refactoring efforts.
- This change aligns with previous updates aimed at streamlining the codebase and preparing for future enhancements without impacting current functionality.

* bring back litellm upgrade version
2025-08-06 14:05:14 -07:00
633WHU
7dc86dc79a perf: optimize string operations with partition() over split()[0] (#3255)
Replace inefficient split()[0] operations with partition()[0] for better performance
when extracting the first part of a string before a delimiter.

Key improvements:
• Agent role processing: 29% faster with partition()
• Model provider extraction: 16% faster
• Console formatting: Improved responsiveness
• Better readability and explicit intent

Changes:
- agent_utils.py: Use partition('\n')[0] for agent role extraction
- console_formatter.py: Optimize agent role processing in logging
- llm_utils.py: Improve model provider parsing
- llm.py: Optimize model name parsing

Performance impact: 15-30% improvement in string processing operations
that are frequently used in agent execution and console output.

cliu_whu@yeah.net

Co-authored-by: chiliu <chiliu@paypal.com>
2025-08-06 15:04:53 -04:00
Vidit Ostwal
7ce20cfcc6 Dropping User Memory (#3225)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* Dropping User Memory

* Dropping checks for user memory

* changed memory.mdx documentation removed user memory.

* Flaky Test Case Maybe

* Drop memory_config

* Fixed test cases

* Fixed some test cases

* Changed docs

* Changed BR docs

* Docs fixing

* Fix minor doc

* Fix minor doc

* Fix minor doc

* Added fallback mechanism in Mem0
2025-08-06 13:08:10 -04:00
Mrunmay Shelar
1d9523c98f docs: add LangDB integration documentation (#3228)
docs: update LangDB links in observability documentation

- Removed references to the AI Gateway features in both English and Portuguese documentation.
- Updated the Model Catalog links to point to the new app.langdb.ai domain.
- Ensured consistency across both language versions of the documentation.
2025-08-06 11:13:58 -04:00
Lucas Gomide
9f1d7d1aa9 fix: allow persist Flow state with BaseModel entries (#3276)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-08-06 09:04:59 -04:00
Lucas Gomide
79b375f6fa build: bump LiteLLM to 1.74.9 (#3278)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-05 17:10:23 -04:00
Lucas Gomide
75752479c2 docs: add CLI config docs (#3275) 2025-08-05 15:24:34 -04:00
Lucas Gomide
477bc1f09e feat: add default value for crew.name (#3252)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-08-05 12:25:50 -04:00
Lucas Gomide
66567bdc2f Support Device authorization with Okta (#3271)
* feat: support oauth2 config for authentication

* refactor: improve OAuth2 settings management

The CLI now supports seamless integration with other authentication providers, since the client_id, issue, domain are now manage by the user

* feat: support okta Device Authorization flow

* chore: resolve linter issues

* test: fix tests

* test: adding tests for auth providers

* test: fix broken test

* refator: adding WorkOS paramenters as default settings auth

* chore: improve oauth2 attributes description

* refactor: simplify WorkOS getting values

* fix: ensure Auth0 parameters is set when overrinding default auth provider

* chore: remove TODO Auth0 no longer provides default values

---------

Co-authored-by: Heitor Carvalho <heitor.scz@gmail.com>
2025-08-05 12:16:21 -04:00
Lucas Gomide
0b31bbe957 fix: enable word wrapping for long input tool (#3274) 2025-08-05 11:05:38 -04:00
Lucas Gomide
246cf588cd docs: updating MCP docs with connect_timeout attribute (#3273) 2025-08-05 10:27:18 -04:00
Heitor Carvalho
88ed91561f feat: add crewai config command group and tests (#3206)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-07-31 10:38:51 -04:00
Lorenze Jay
9a347ad458 chore: update crewai-tools dependency to version 0.59.0 and bump CrewAI version to 0.152.0 (#3244)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Updated `crewai-tools` dependency from `0.58.0` to `0.59.0` in `pyproject.toml` and `uv.lock`.
- Bumped the version of the CrewAI library from `0.150.0` to `0.152.0` in `__init__.py`.
- Updated dependency versions in CLI templates for crew, flow, and tool projects to reflect the new CrewAI version.
2025-07-30 14:38:24 -07:00
Lucas Gomide
34c3075fdb fix: support to add memories to Mem0 with agent_id (#3217)
* fix: support to add memories to Mem0 with agent_id

* feat: removing memory_type checkings from Mem0Storage

* feat: ensure agent_id is always present while saving memory into Mem0

* fix: use OR operator when querying Mem0 memories with both user_id and agent_id
2025-07-30 11:56:46 -04:00
Vidit Ostwal
498e8dc6e8 Changed the import error to show missing module files (#2423)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* Fix issue #2421: Handle missing google.genai dependency gracefully

Co-Authored-By: Joe Moura <joao@crewai.com>

* Fix import sorting in test file

Co-Authored-By: Joe Moura <joao@crewai.com>

* Fix import sorting with ruff

Co-Authored-By: Joe Moura <joao@crewai.com>

* Removed unwatned test case

* Added dynamic catching for all the embedder function

* Dropped the comment

* Added test case

* Fixed Linting Issue

* Flaky test case in 3.13

* Test Case fixed

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
2025-07-30 10:01:17 -04:00
Lorenze Jay
cb522cf500 Enhance Flow class to support custom flow names (#3234)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Added an optional `name` attribute to the Flow class for better identification.
- Updated event emissions to utilize the new `name` attribute, ensuring accurate flow naming in events.
- Added tests to verify the correct flow name is set and emitted during flow execution.
2025-07-29 15:41:30 -07:00
Vini Brasil
017acc74f5 Add timezone to event timestamps (#3231)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Events were lacking timezone information, making them naive datetimes,
which can be ambiguous.
2025-07-28 17:09:06 -03:00
Greyson LaLonde
fab86d197a Refactor: Move RAG components to dedicated top-level module (#3222)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Move RAG components to top-level module

- Create src/crewai/rag directory structure
- Move embeddings configurator from utilities to rag module
- Update imports across codebase and documentation
- Remove deprecated embedding files

* Remove empty knowledge/embedder directory
2025-07-25 10:55:31 -04:00
Vidit Ostwal
864e9bfb76 Changed the default value in Mem0 config (#3216)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Changed the default value in Mem0 config

* Added regression test for this

* Fixed Linting issues
2025-07-24 13:20:18 -04:00
Lucas Gomide
d3b45d197c fix: remove crewai signup references, replaced by crewai login (#3213) 2025-07-24 07:47:35 -04:00
Manuka Yasas
579153b070 docs: fix incorrect model naming in Google Vertex AI documentation (#3189)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Change model format from "gemini/gemini-1.5-pro-latest" to "gemini-1.5-pro-latest"
  in Vertex AI section examples
- Update both English and Portuguese documentation files
- Fixes incorrect provider prefix usage for Vertex AI models
- Ensures consistency with Vertex AI provider requirements

Files changed:
- docs/en/concepts/llms.mdx (line 272)
- docs/pt-BR/concepts/llms.mdx (line 270)

Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-07-23 16:58:57 -04:00
Lorenze Jay
b1fdcdfa6e chore: update dependencies and version in project files (#3212)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
- Updated `crewai-tools` dependency from `0.55.0` to `0.58.0` in `pyproject.toml` and `uv.lock`.
- Added new packages `anthropic`, `browserbase`, `playwright`, `pyee`, and `stagehand` with their respective versions in `uv.lock`.
- Bumped the version of the CrewAI library from `0.148.0` to `0.150.0` in `__init__.py`.
- Updated dependency versions in CLI templates for crew, flow, and tool projects to reflect the new CrewAI version.
2025-07-23 11:03:50 -07:00
Mike Plachta
18d76a270c docs: add SerperScrapeWebsiteTool documentation and reorganize SerperDevTool setup instructions (#3211) 2025-07-23 12:12:59 -04:00
Vidit Ostwal
30541239ad Changed Mem0 Storage v1.1 -> v2 (#2893)
* Changed v1.1 -> v2

* Fixed Test Cases:

* Fixed linting issues

* Changed docs

* Refractored the storage

* Fixed test cases

* Fixing run-time checks

* Fixed Test Case

* Updated docs and added test case for custom categories

* Add the TODO back

* Minor Changes

* Added output_format in search

* Minor changes

* Added output_format and version in both search and save

* Small change

* Minor bugs

* Fixed test cases

* Changed docs

---------

Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
2025-07-23 08:30:52 -04:00
Tony Kipkemboi
9a65573955 Feature/update docs (#3205)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* docs: add create_directory parameter

* docs: remove string guardrails to focus on function guardrails

* docs: remove get help from docs.json

* docs: update pt-BR docs.json changes
2025-07-22 13:55:27 -04:00
Lucas Gomide
27623a1d01 feat: remove duplicate print on LLM call error (#3183)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
By improving litellm handler error / outputs

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-07-21 22:08:07 -04:00
João Moura
2593242234 Adding Support to adhoc tool calling using the internal LLM class (#3195)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* Adding Support to adhoc tool calling using the internal LLM class

* fix type
2025-07-21 19:36:48 -03:00
Greyson LaLonde
2ab6c31544 chore: add deprecation notices to UserMemory (#3201)
- Mark UserMemory and UserMemoryItem for removal in v0.156.0 or 2025-08-04
- Update all references with deprecation warnings
- Users should migrate to ExternalMemory
2025-07-21 15:26:34 -04:00
Lucas Gomide
3c55c8a22a fix: append user message when last message is from assistent when using Ollama models (#3200)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Ollama doesn't supports last message to be 'assistant'
We can drop this commit after merging https://github.com/BerriAI/litellm/pull/10917
2025-07-21 13:30:40 -04:00
Ranuga Disansa
424433ff58 docs: Add Tavily Search & Extractor tools to Search-Research suite (#3146)
* docs: Add Tavily Search and Extractor tools documentation

* docs: Add Tavily Search and Extractor tools to the documentation

---------

Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-07-21 12:01:29 -04:00
Lucas Gomide
2fd99503ed build: upgrade LiteLLM to 1.74.3 (#3199) 2025-07-21 09:58:47 -04:00
Vidit Ostwal
942014962e fixed save method, changed the test cases (#3187)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* fixed save method, changed the test cases

* Linting fixed
2025-07-18 15:10:26 -04:00
Lucas Gomide
2ab79a7dd5 feat: drop unsupported stop parameter for LLM models automatically (#3184) 2025-07-18 13:54:28 -04:00
Lucas Gomide
27c449c9c4 test: remove workaround related to SQLite without FTS5 (#3179)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
For more details check out [here](actions/runner-images#12576)
2025-07-18 09:37:15 -04:00
Vini Brasil
9737333ffd Use file lock around Chroma client initialization (#3181)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
This commit fixes a bug with concurrent processess and Chroma where
`table collections already exists` (and similar) were raised.

https://cookbook.chromadb.dev/core/system_constraints/
2025-07-17 11:50:45 -03:00
Lucas Gomide
bf248d5118 docs: fix neatlogs documentation (#3171)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-07-16 21:18:04 -04:00
Lorenze Jay
2490e8cd46 Update CrewAI version to 0.148.0 in project templates and dependencies (#3172)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* Update CrewAI version to 0.148.0 in project templates and dependencies

* Update crewai-tools dependency to version 0.55.0 in pyproject.toml and uv.lock for improved functionality and performance.
2025-07-16 12:36:43 -07:00
Lucas Gomide
9b67e5a15f Emit events about Agent eval (#3168)
* feat: emit events abou Agent Eval

We are triggering events when an evaluation has started/completed/failed

* style: fix type checking issues
2025-07-16 13:18:59 -04:00
Lucas Gomide
6ebb6c9b63 Supporting eval single Agent/LiteAgent (#3167)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* refactor: rely on task completion event to evaluate agents

* feat: remove Crew dependency to evaluate agent

* feat: drop execution_context in AgentEvaluator

* chore: drop experimental Agent Eval feature from stable crew.test

* feat: support eval LiteAgent

* resolve linter issues
2025-07-15 09:22:41 -04:00
Lucas Gomide
53f674be60 chore: remove evaluation folder (#3159)
This folder was moved to `experimental` folder
2025-07-15 08:30:20 -04:00
Paras Sakarwal
11717a5213 docs: added integration with neatlogs (#3138)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-07-14 11:08:24 -04:00
Lucas Gomide
b6d699f764 Implement thread-safe AgentEvaluator (#3157)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* refactor: implement thread-safe AgentEvaluator with hybrid state management

* chore: remove useless comments
2025-07-14 10:05:42 -04:00
Lucas Gomide
5b15061b87 test: add test helper to assert Agent Experiments (#3156) 2025-07-14 09:24:49 -04:00
Lucas Gomide
1b6b2b36d9 Introduce Evaluator Experiment (#3133)
* feat: add exchanged messages in LLMCallCompletedEvent

* feat: add GoalAlignment metric for Agent evaluation

* feat: add SemanticQuality metric for Agent evaluation

* feat: add Tool Metrics for Agent evaluation

* feat: add Reasoning Metrics for Agent evaluation, still in progress

* feat: add AgentEvaluator class

This class will evaluate Agent' results and report to user

* fix: do not evaluate Agent by default

This is a experimental feature we still need refine it further

* test: add Agent eval tests

* fix: render all feedback per iteration

* style: resolve linter issues

* style: fix mypy issues

* fix: allow messages be empty on LLMCallCompletedEvent

* feat: add Experiment evaluation framework with baseline comparison

* fix: reset evaluator for each experiement iteraction

* fix: fix track of new test cases

* chore: split Experimental evaluation classes

* refactor: remove unused method

* refactor: isolate Console print in a dedicated class

* fix: make crew required to run an experiment

* fix: use time-aware to define experiment result

* test: add tests for Evaluator Experiment

* style: fix linter issues

* fix: encode string before hashing

* style: resolve linter issues

* feat: add experimental folder for beta features (#3141)

* test: move tests to experimental folder
2025-07-14 09:06:45 -04:00
devin-ai-integration[bot]
3ada4053bd Fix #3149: Add missing create_directory parameter to Task class (#3150)
* Fix #3149: Add missing create_directory parameter to Task class

- Add create_directory field with default value True for backward compatibility
- Update _save_file method to respect create_directory parameter
- Add comprehensive tests covering all scenarios
- Maintain existing behavior when create_directory=True (default)

The create_directory parameter was documented but missing from implementation.
Users can now control directory creation behavior:
- create_directory=True (default): Creates directories if they don't exist
- create_directory=False: Raises RuntimeError if directory doesn't exist

Fixes issue where users got TypeError when trying to use the documented
create_directory parameter.

Co-Authored-By: Jo\u00E3o <joao@crewai.com>

* Fix lint: Remove unused import os from test_create_directory_true

- Removes F401 lint error: 'os' imported but unused
- All lint checks should now pass

Co-Authored-By: Jo\u00E3o <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Jo\u00E3o <joao@crewai.com>
2025-07-14 08:15:41 -04:00
Vidit Ostwal
e7a5747c6b Comparing BaseLLM class instead of LLM (#3120)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Compaing BaseLLM class instead of LLM

* Fixed test cases

* Fixed Linting Issues

* removed last line

---------

Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
2025-07-11 20:50:36 -04:00
Vidit Ostwal
eec1262d4f Fix agent knowledge (#2831)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* Added add_sources()

* Fixed the agent knowledge querying

* Added test cases

* Fixed linting issue

* Fixed logic

* Seems like a falky test case

* Minor changes

* Added knowledge attriute to the crew documentation

* Flaky test

* fixed spaces

* Flaky Test Case

* Seems like a flaky test case

---------

Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
2025-07-11 13:52:26 -04:00
Tony Kipkemboi
c6caa763d7 docs: Add guardrail attribute documentation and examples (#3139)
- Document string-based guardrails in tasks
- Add guardrail examples to YAML configuration
- Fix Python code formatting in PT-BR CLI docs
2025-07-11 13:32:59 -04:00
Lucas Gomide
08fa3797ca Introducing Agent evaluation (#3130)
* feat: add exchanged messages in LLMCallCompletedEvent

* feat: add GoalAlignment metric for Agent evaluation

* feat: add SemanticQuality metric for Agent evaluation

* feat: add Tool Metrics for Agent evaluation

* feat: add Reasoning Metrics for Agent evaluation, still in progress

* feat: add AgentEvaluator class

This class will evaluate Agent' results and report to user

* fix: do not evaluate Agent by default

This is a experimental feature we still need refine it further

* test: add Agent eval tests

* fix: render all feedback per iteration

* style: resolve linter issues

* style: fix mypy issues

* fix: allow messages be empty on LLMCallCompletedEvent
2025-07-11 13:18:03 -04:00
Greyson LaLonde
bf8fa3232b Add SQLite FTS5 support to test workflow (#3140)
* Add SQLite FTS5 support to test workflow

* Add explanatory comment for SQLite FTS5 workaround
2025-07-11 12:01:25 -04:00
Heitor Carvalho
a6e60a5d42 fix: use production workos environment id (#3129)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-07-09 17:09:01 -04:00
Lorenze Jay
7b0f3aabd9 chore: update crewAI and dependencies to version 0.141.0 and 0.51.0 (#3128)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
- Bump crewAI version to 0.141.0 in __init__.py for alignment with updated dependencies.
- Update `crewai-tools` dependency version to 0.51.0 in pyproject.toml and related template files.
- Add new testing dependencies: pytest-split and pytest-xdist for improved test execution.
- Ensure compatibility with the latest package versions in uv.lock and template files.
2025-07-09 10:37:06 -07:00
159 changed files with 12719 additions and 1503 deletions

3
.gitignore vendored
View File

@@ -26,4 +26,5 @@ test_flow.html
crewairules.mdc
plan.md
conceptual_plan.md
build_image
build_image
chromadb-*.lock

View File

@@ -9,12 +9,7 @@
},
"favicon": "/images/favicon.svg",
"contextual": {
"options": [
"copy",
"view",
"chatgpt",
"claude"
]
"options": ["copy", "view", "chatgpt", "claude"]
},
"navigation": {
"languages": [
@@ -37,11 +32,6 @@
"href": "https://chatgpt.com/g/g-qqTuUWsBY-crewai-assistant",
"icon": "robot"
},
{
"anchor": "Get Help",
"href": "mailto:support@crewai.com",
"icon": "headset"
},
{
"anchor": "Releases",
"href": "https://github.com/crewAIInc/crewAI/releases",
@@ -55,32 +45,22 @@
"groups": [
{
"group": "Get Started",
"pages": [
"en/introduction",
"en/installation",
"en/quickstart"
]
"pages": ["en/introduction", "en/installation", "en/quickstart"]
},
{
"group": "Guides",
"pages": [
{
"group": "Strategy",
"pages": [
"en/guides/concepts/evaluating-use-cases"
]
"pages": ["en/guides/concepts/evaluating-use-cases"]
},
{
"group": "Agents",
"pages": [
"en/guides/agents/crafting-effective-agents"
]
"pages": ["en/guides/agents/crafting-effective-agents"]
},
{
"group": "Crews",
"pages": [
"en/guides/crews/first-crew"
]
"pages": ["en/guides/crews/first-crew"]
},
{
"group": "Flows",
@@ -94,7 +74,6 @@
"pages": [
"en/guides/advanced/customizing-prompts",
"en/guides/advanced/fingerprinting"
]
}
]
@@ -182,7 +161,9 @@
"en/tools/search-research/websitesearchtool",
"en/tools/search-research/codedocssearchtool",
"en/tools/search-research/youtubechannelsearchtool",
"en/tools/search-research/youtubevideosearchtool"
"en/tools/search-research/youtubevideosearchtool",
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool"
]
},
{
@@ -237,10 +218,12 @@
"en/observability/overview",
"en/observability/agentops",
"en/observability/arize-phoenix",
"en/observability/langdb",
"en/observability/langfuse",
"en/observability/langtrace",
"en/observability/maxim",
"en/observability/mlflow",
"en/observability/neatlogs",
"en/observability/openlit",
"en/observability/opik",
"en/observability/patronus-evaluation",
@@ -274,9 +257,7 @@
},
{
"group": "Telemetry",
"pages": [
"en/telemetry"
]
"pages": ["en/telemetry"]
}
]
},
@@ -285,9 +266,7 @@
"groups": [
{
"group": "Getting Started",
"pages": [
"en/enterprise/introduction"
]
"pages": ["en/enterprise/introduction"]
},
{
"group": "Features",
@@ -342,9 +321,7 @@
},
{
"group": "Resources",
"pages": [
"en/enterprise/resources/frequently-asked-questions"
]
"pages": ["en/enterprise/resources/frequently-asked-questions"]
}
]
},
@@ -353,9 +330,7 @@
"groups": [
{
"group": "Getting Started",
"pages": [
"en/api-reference/introduction"
]
"pages": ["en/api-reference/introduction"]
},
{
"group": "Endpoints",
@@ -365,16 +340,13 @@
},
{
"tab": "Examples",
"groups": [
"groups": [
{
"group": "Examples",
"pages": [
"en/examples/example"
]
"pages": ["en/examples/example"]
}
]
}
]
},
{
@@ -396,11 +368,6 @@
"href": "https://chatgpt.com/g/g-qqTuUWsBY-crewai-assistant",
"icon": "robot"
},
{
"anchor": "Obter Ajuda",
"href": "mailto:support@crewai.com",
"icon": "headset"
},
{
"anchor": "Lançamentos",
"href": "https://github.com/crewAIInc/crewAI/releases",
@@ -425,21 +392,15 @@
"pages": [
{
"group": "Estratégia",
"pages": [
"pt-BR/guides/concepts/evaluating-use-cases"
]
"pages": ["pt-BR/guides/concepts/evaluating-use-cases"]
},
{
"group": "Agentes",
"pages": [
"pt-BR/guides/agents/crafting-effective-agents"
]
"pages": ["pt-BR/guides/agents/crafting-effective-agents"]
},
{
"group": "Crews",
"pages": [
"pt-BR/guides/crews/first-crew"
]
"pages": ["pt-BR/guides/crews/first-crew"]
},
{
"group": "Flows",
@@ -595,6 +556,7 @@
"pt-BR/observability/overview",
"pt-BR/observability/agentops",
"pt-BR/observability/arize-phoenix",
"pt-BR/observability/langdb",
"pt-BR/observability/langfuse",
"pt-BR/observability/langtrace",
"pt-BR/observability/maxim",
@@ -632,9 +594,7 @@
},
{
"group": "Telemetria",
"pages": [
"pt-BR/telemetry"
]
"pages": ["pt-BR/telemetry"]
}
]
},
@@ -643,9 +603,7 @@
"groups": [
{
"group": "Começando",
"pages": [
"pt-BR/enterprise/introduction"
]
"pages": ["pt-BR/enterprise/introduction"]
},
{
"group": "Funcionalidades",
@@ -710,9 +668,7 @@
"groups": [
{
"group": "Começando",
"pages": [
"pt-BR/api-reference/introduction"
]
"pages": ["pt-BR/api-reference/introduction"]
},
{
"group": "Endpoints",
@@ -722,16 +678,13 @@
},
{
"tab": "Exemplos",
"groups": [
"groups": [
{
"group": "Exemplos",
"pages": [
"pt-BR/examples/example"
]
"pages": ["pt-BR/examples/example"]
}
]
}
]
}
]

View File

@@ -88,7 +88,7 @@ crewai replay [OPTIONS]
- `-t, --task_id TEXT`: Replay the crew from this task ID, including all subsequent tasks
Example:
```shell Terminal
```shell Terminal
crewai replay -t task_123456
```
@@ -134,7 +134,7 @@ crewai test [OPTIONS]
- `-m, --model TEXT`: LLM Model to run the tests on the Crew (default: "gpt-4o-mini")
Example:
```shell Terminal
```shell Terminal
crewai test -n 5 -m gpt-3.5-turbo
```
@@ -151,7 +151,7 @@ Starting from version 0.103.0, the `crewai run` command can be used to run both
</Note>
<Note>
Make sure to run these commands from the directory where your CrewAI project is set up.
Make sure to run these commands from the directory where your CrewAI project is set up.
Some commands may require additional configuration or setup within your project structure.
</Note>
@@ -235,7 +235,7 @@ You must be authenticated to CrewAI Enterprise to use these organization managem
- **Deploy the Crew**: Once you are authenticated, you can deploy your crew or flow to CrewAI Enterprise.
```shell Terminal
crewai deploy push
```
```
- Initiates the deployment process on the CrewAI Enterprise platform.
- Upon successful initiation, it will output the Deployment created successfully! message along with the Deployment Name and a unique Deployment ID (UUID).
@@ -309,3 +309,82 @@ When you select a provider, the CLI will prompt you to enter the Key name and th
See the following link for each provider's key name:
* [LiteLLM Providers](https://docs.litellm.ai/docs/providers)
### 12. Configuration Management
Manage CLI configuration settings for CrewAI.
```shell Terminal
crewai config [COMMAND] [OPTIONS]
```
#### Commands:
- `list`: Display all CLI configuration parameters
```shell Terminal
crewai config list
```
- `set`: Set a CLI configuration parameter
```shell Terminal
crewai config set <key> <value>
```
- `reset`: Reset all CLI configuration parameters to default values
```shell Terminal
crewai config reset
```
#### Available Configuration Parameters
- `enterprise_base_url`: Base URL of the CrewAI Enterprise instance
- `oauth2_provider`: OAuth2 provider used for authentication (e.g., workos, okta, auth0)
- `oauth2_audience`: OAuth2 audience value, typically used to identify the target API or resource
- `oauth2_client_id`: OAuth2 client ID issued by the provider, used during authentication requests
- `oauth2_domain`: OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens
#### Examples
Display current configuration:
```shell Terminal
crewai config list
```
Example output:
```
CrewAI CLI Configuration
┏━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Setting ┃ Value ┃ Description ┃
┡━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ enterprise_base_url│ https://app.crewai.com │ Base URL of the CrewAI Enterprise instance │
│ org_name │ Not set │ Name of the currently active organization │
│ org_uuid │ Not set │ UUID of the currently active organization │
│ oauth2_provider │ workos │ OAuth2 provider used for authentication (e.g., workos, okta, auth0). │
│ oauth2_audience │ client_01YYY │ OAuth2 audience value, typically used to identify the target API or resource. │
│ oauth2_client_id │ client_01XXX │ OAuth2 client ID issued by the provider, used during authentication requests. │
│ oauth2_domain │ login.crewai.com │ OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens. │
```
Set the enterprise base URL:
```shell Terminal
crewai config set enterprise_base_url https://my-enterprise.crewai.com
```
Set OAuth2 provider:
```shell Terminal
crewai config set oauth2_provider auth0
```
Set OAuth2 domain:
```shell Terminal
crewai config set oauth2_domain my-company.auth0.com
```
Reset all configuration to defaults:
```shell Terminal
crewai config reset
```
<Note>
Configuration settings are stored in `~/.config/crewai/settings.json`. Some settings like organization name and UUID are read-only and managed through authentication and organization commands. Tool repository related settings are hidden and cannot be set directly by users.
</Note>

View File

@@ -20,8 +20,7 @@ A crew in crewAI represents a collaborative group of agents working together to
| **Function Calling LLM** _(optional)_ | `function_calling_llm` | If passed, the crew will use this LLM to do function calling for tools for all agents in the crew. Each agent can have its own LLM, which overrides the crew's LLM for function calling. |
| **Config** _(optional)_ | `config` | Optional configuration settings for the crew, in `Json` or `Dict[str, Any]` format. |
| **Max RPM** _(optional)_ | `max_rpm` | Maximum requests per minute the crew adheres to during execution. Defaults to `None`. |
| **Memory** _(optional)_ | `memory` | Utilized for storing execution memories (short-term, long-term, entity memory). |
| **Memory Config** _(optional)_ | `memory_config` | Configuration for the memory provider to be used by the crew. |
| **Memory** _(optional)_ | `memory` | Utilized for storing execution memories (short-term, long-term, entity memory). | |
| **Cache** _(optional)_ | `cache` | Specifies whether to use a cache for storing the results of tools' execution. Defaults to `True`. |
| **Embedder** _(optional)_ | `embedder` | Configuration for the embedder to be used by the crew. Mostly used by memory for now. Default is `{"provider": "openai"}`. |
| **Step Callback** _(optional)_ | `step_callback` | A function that is called after each step of every agent. This can be used to log the agent's actions or to perform other operations; it won't override the agent-specific `step_callback`. |
@@ -32,6 +31,7 @@ A crew in crewAI represents a collaborative group of agents working together to
| **Prompt File** _(optional)_ | `prompt_file` | Path to the prompt JSON file to be used for the crew. |
| **Planning** *(optional)* | `planning` | Adds planning ability to the Crew. When activated before each Crew iteration, all Crew data is sent to an AgentPlanner that will plan the tasks and this plan will be added to each task description. |
| **Planning LLM** *(optional)* | `planning_llm` | The language model used by the AgentPlanner in a planning process. |
| **Knowledge Sources** _(optional)_ | `knowledge_sources` | Knowledge sources available at the crew level, accessible to all the agents. |
<Tip>
**Crew Max RPM**: The `max_rpm` attribute sets the maximum number of requests per minute the crew can perform to avoid rate limits and will override individual agents' `max_rpm` settings if you set it.

View File

@@ -270,7 +270,7 @@ In this section, you'll find detailed examples that help you select, configure,
from crewai import LLM
llm = LLM(
model="gemini/gemini-1.5-pro-latest",
model="gemini-1.5-pro-latest", # or vertex_ai/gemini-1.5-pro-latest
temperature=0.7,
vertex_credentials=vertex_credentials_json
)

View File

@@ -9,8 +9,7 @@ icon: database
The CrewAI framework provides a sophisticated memory system designed to significantly enhance AI agent capabilities. CrewAI offers **three distinct memory approaches** that serve different use cases:
1. **Basic Memory System** - Built-in short-term, long-term, and entity memory
2. **User Memory** - User-specific memory with Mem0 integration (legacy approach)
3. **External Memory** - Standalone external memory providers (new approach)
2. **External Memory** - Standalone external memory providers
## Memory System Components
@@ -19,7 +18,7 @@ The CrewAI framework provides a sophisticated memory system designed to signific
| **Short-Term Memory**| Temporarily stores recent interactions and outcomes using `RAG`, enabling agents to recall and utilize information relevant to their current context during the current executions.|
| **Long-Term Memory** | Preserves valuable insights and learnings from past executions, allowing agents to build and refine their knowledge over time. |
| **Entity Memory** | Captures and organizes information about entities (people, places, concepts) encountered during tasks, facilitating deeper understanding and relationship mapping. Uses `RAG` for storing entity information. |
| **Contextual Memory**| Maintains the context of interactions by combining `ShortTermMemory`, `LongTermMemory`, and `EntityMemory`, aiding in the coherence and relevance of agent responses over a sequence of tasks or a conversation. |
| **Contextual Memory**| Maintains the context of interactions by combining `ShortTermMemory`, `LongTermMemory`, `ExternalMemory` and `EntityMemory`, aiding in the coherence and relevance of agent responses over a sequence of tasks or a conversation. |
## 1. Basic Memory System (Recommended)
@@ -202,7 +201,7 @@ crew = Crew(
tasks=[task],
memory=True,
embedder={
"provider": "anthropic", # Match your LLM provider
"provider": "anthropic", # Match your LLM provider
"config": {
"api_key": "your-anthropic-key",
"model": "text-embedding-3-small"
@@ -623,7 +622,7 @@ for provider in providers_to_test:
**Model not found errors:**
```python
# Verify model availability
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
try:
@@ -684,67 +683,18 @@ print(f"OpenAI: {openai_time:.2f}s")
print(f"Ollama: {ollama_time:.2f}s")
```
## 2. User Memory with Mem0 (Legacy)
## 2. External Memory
External Memory provides a standalone memory system that operates independently from the crew's built-in memory. This is ideal for specialized memory providers or cross-application memory sharing.
<Warning>
**Legacy Approach**: While fully functional, this approach is considered legacy. For new projects requiring user-specific memory, consider using External Memory instead.
</Warning>
User Memory integrates with [Mem0](https://mem0.ai/) to provide user-specific memory that persists across sessions and integrates with the crew's contextual memory system.
### Prerequisites
```bash
pip install mem0ai
```
### Mem0 Cloud Configuration
### Basic External Memory with Mem0
```python
import os
from crewai import Crew, Process
from crewai import Agent, Crew, Process, Task
from crewai.memory.external.external_memory import ExternalMemory
# Set your Mem0 API key
os.environ["MEM0_API_KEY"] = "m0-your-api-key"
crew = Crew(
agents=[...],
tasks=[...],
memory=True, # Required for contextual memory integration
memory_config={
"provider": "mem0",
"config": {"user_id": "john"},
"user_memory": {} # Required - triggers user memory initialization
},
process=Process.sequential,
verbose=True
)
```
### Advanced Mem0 Configuration
```python
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
memory_config={
"provider": "mem0",
"config": {
"user_id": "john",
"org_id": "my_org_id", # Optional
"project_id": "my_project_id", # Optional
"api_key": "custom-api-key" # Optional - overrides env var
},
"user_memory": {}
}
)
```
### Local Mem0 Configuration
```python
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
memory_config={
# Create external memory instance with local Mem0 Configuration
external_memory = ExternalMemory(
embedder_config={
"provider": "mem0",
"config": {
"user_id": "john",
@@ -761,37 +711,60 @@ crew = Crew(
"provider": "openai",
"config": {"api_key": "your-api-key", "model": "text-embedding-3-small"}
}
}
},
"infer": True # Optional defaults to True
},
"user_memory": {}
}
)
```
## 3. External Memory (New Approach)
External Memory provides a standalone memory system that operates independently from the crew's built-in memory. This is ideal for specialized memory providers or cross-application memory sharing.
### Basic External Memory with Mem0
```python
import os
from crewai import Agent, Crew, Process, Task
from crewai.memory.external.external_memory import ExternalMemory
os.environ["MEM0_API_KEY"] = "your-api-key"
# Create external memory instance
external_memory = ExternalMemory(
embedder_config={
"provider": "mem0",
"config": {"user_id": "U-123"}
}
)
crew = Crew(
agents=[...],
tasks=[...],
external_memory=external_memory, # Separate from basic memory
external_memory=external_memory, # Separate from basic memory
process=Process.sequential,
verbose=True
)
```
### Advanced External Memory with Mem0 Client
When using Mem0 Client, you can customize the memory configuration further, by using parameters like 'includes', 'excludes', 'custom_categories', 'infer' and 'run_id' (this is only for short-term memory).
You can find more details in the [Mem0 documentation](https://docs.mem0.ai/).
```python
import os
from crewai import Agent, Crew, Process, Task
from crewai.memory.external.external_memory import ExternalMemory
new_categories = [
{"lifestyle_management_concerns": "Tracks daily routines, habits, hobbies and interests including cooking, time management and work-life balance"},
{"seeking_structure": "Documents goals around creating routines, schedules, and organized systems in various life areas"},
{"personal_information": "Basic information about the user including name, preferences, and personality traits"}
]
os.environ["MEM0_API_KEY"] = "your-api-key"
# Create external memory instance with Mem0 Client
external_memory = ExternalMemory(
embedder_config={
"provider": "mem0",
"config": {
"user_id": "john",
"org_id": "my_org_id", # Optional
"project_id": "my_project_id", # Optional
"api_key": "custom-api-key" # Optional - overrides env var
"run_id": "my_run_id", # Optional - for short-term memory
"includes": "include1", # Optional
"excludes": "exclude1", # Optional
"infer": True # Optional defaults to True
"custom_categories": new_categories # Optional - custom categories for user memory
},
}
)
crew = Crew(
agents=[...],
tasks=[...],
external_memory=external_memory, # Separate from basic memory
process=Process.sequential,
verbose=True
)
@@ -830,17 +803,18 @@ crew = Crew(
)
```
## Memory System Comparison
## 🧠 Memory System Comparison
| **Category** | **Feature** | **Basic Memory** | **External Memory** |
|---------------------|------------------------|-----------------------------|------------------------------|
| **Ease of Use** | Setup Complexity | Simple | Moderate |
| | Integration | Built-in (contextual) | Standalone |
| **Persistence** | Storage | Local files | Custom / Mem0 |
| | Cross-session Support | ✅ | ✅ |
| **Personalization** | User-specific Memory | ❌ | ✅ |
| | Custom Providers | Limited | Any provider |
| **Use Case Fit** | Recommended For | Most general use cases | Specialized / custom needs |
| Feature | Basic Memory | User Memory (Legacy) | External Memory |
|---------|-------------|---------------------|----------------|
| **Setup Complexity** | Simple | Medium | Medium |
| **Integration** | Built-in contextual | Contextual + User-specific | Standalone |
| **Storage** | Local files | Mem0 Cloud/Local | Custom/Mem0 |
| **Cross-session** | ✅ | ✅ | ✅ |
| **User-specific** | ❌ | ✅ | ✅ |
| **Custom providers** | Limited | Mem0 only | Any provider |
| **Recommended for** | Most use cases | Legacy projects | Specialized needs |
## Supported Embedding Providers

View File

@@ -54,9 +54,11 @@ crew = Crew(
| **Markdown** _(optional)_ | `markdown` | `Optional[bool]` | Whether the task should instruct the agent to return the final answer formatted in Markdown. Defaults to False. |
| **Config** _(optional)_ | `config` | `Optional[Dict[str, Any]]` | Task-specific configuration parameters. |
| **Output File** _(optional)_ | `output_file` | `Optional[str]` | File path for storing the task output. |
| **Create Directory** _(optional)_ | `create_directory` | `Optional[bool]` | Whether to create the directory for output_file if it doesn't exist. Defaults to True. |
| **Output JSON** _(optional)_ | `output_json` | `Optional[Type[BaseModel]]` | A Pydantic model to structure the JSON output. |
| **Output Pydantic** _(optional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | A Pydantic model for task output. |
| **Callback** _(optional)_ | `callback` | `Optional[Any]` | Function/object to be executed after task completion. |
| **Guardrail** _(optional)_ | `guardrail` | `Optional[Callable]` | Function to validate task output before proceeding to next task. |
## Creating Tasks
@@ -332,9 +334,11 @@ Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
feedback to agents when their output doesn't meet specific criteria.
### Using Task Guardrails
Guardrails are implemented as Python functions that contain custom validation logic, giving you complete control over the validation process and ensuring reliable, deterministic results.
To add a guardrail to a task, provide a validation function through the `guardrail` parameter:
### Function-Based Guardrails
To add a function-based guardrail to a task, provide a validation function through the `guardrail` parameter:
```python Code
from typing import Tuple, Union, Dict, Any
@@ -372,9 +376,7 @@ blog_task = Task(
- On success: it returns a tuple of `(bool, Any)`. For example: `(True, validated_result)`
- On Failure: it returns a tuple of `(bool, str)`. For example: `(False, "Error message explain the failure")`
### LLMGuardrail
The `LLMGuardrail` class offers a robust mechanism for validating task outputs.
### Error Handling Best Practices
@@ -798,184 +800,91 @@ While creating and executing tasks, certain validation mechanisms are in place t
These validations help in maintaining the consistency and reliability of task executions within the crewAI framework.
## Task Guardrails
Task guardrails provide a powerful way to validate, transform, or filter task outputs before they are passed to the next task. Guardrails are optional functions that execute before the next task starts, allowing you to ensure that task outputs meet specific requirements or formats.
### Basic Usage
#### Define your own logic to validate
```python Code
from typing import Tuple, Union
from crewai import Task
def validate_json_output(result: str) -> Tuple[bool, Union[dict, str]]:
"""Validate that the output is valid JSON."""
try:
json_data = json.loads(result)
return (True, json_data)
except json.JSONDecodeError:
return (False, "Output must be valid JSON")
task = Task(
description="Generate JSON data",
expected_output="Valid JSON object",
guardrail=validate_json_output
)
```
#### Leverage a no-code approach for validation
```python Code
from crewai import Task
task = Task(
description="Generate JSON data",
expected_output="Valid JSON object",
guardrail="Ensure the response is a valid JSON object"
)
```
#### Using YAML
```yaml
research_task:
...
guardrail: make sure each bullet contains a minimum of 100 words
...
```
```python Code
@CrewBase
class InternalCrew:
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
...
@task
def research_task(self):
return Task(config=self.tasks_config["research_task"]) # type: ignore[index]
...
```
#### Use custom models for code generation
```python Code
from crewai import Task
from crewai.llm import LLM
task = Task(
description="Generate JSON data",
expected_output="Valid JSON object",
guardrail=LLMGuardrail(
description="Ensure the response is a valid JSON object",
llm=LLM(model="gpt-4o-mini"),
)
)
```
### How Guardrails Work
1. **Optional Attribute**: Guardrails are an optional attribute at the task level, allowing you to add validation only where needed.
2. **Execution Timing**: The guardrail function is executed before the next task starts, ensuring valid data flow between tasks.
3. **Return Format**: Guardrails must return a tuple of `(success, data)`:
- If `success` is `True`, `data` is the validated/transformed result
- If `success` is `False`, `data` is the error message
4. **Result Routing**:
- On success (`True`), the result is automatically passed to the next task
- On failure (`False`), the error is sent back to the agent to generate a new answer
### Common Use Cases
#### Data Format Validation
```python Code
def validate_email_format(result: str) -> Tuple[bool, Union[str, str]]:
"""Ensure the output contains a valid email address."""
import re
email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
if re.match(email_pattern, result.strip()):
return (True, result.strip())
return (False, "Output must be a valid email address")
```
#### Content Filtering
```python Code
def filter_sensitive_info(result: str) -> Tuple[bool, Union[str, str]]:
"""Remove or validate sensitive information."""
sensitive_patterns = ['SSN:', 'password:', 'secret:']
for pattern in sensitive_patterns:
if pattern.lower() in result.lower():
return (False, f"Output contains sensitive information ({pattern})")
return (True, result)
```
#### Data Transformation
```python Code
def normalize_phone_number(result: str) -> Tuple[bool, Union[str, str]]:
"""Ensure phone numbers are in a consistent format."""
import re
digits = re.sub(r'\D', '', result)
if len(digits) == 10:
formatted = f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
return (True, formatted)
return (False, "Output must be a 10-digit phone number")
```
### Advanced Features
#### Chaining Multiple Validations
```python Code
def chain_validations(*validators):
"""Chain multiple validators together."""
def combined_validator(result):
for validator in validators:
success, data = validator(result)
if not success:
return (False, data)
result = data
return (True, result)
return combined_validator
# Usage
task = Task(
description="Get user contact info",
expected_output="Email and phone",
guardrail=chain_validations(
validate_email_format,
filter_sensitive_info
)
)
```
#### Custom Retry Logic
```python Code
task = Task(
description="Generate data",
expected_output="Valid data",
guardrail=validate_data,
max_retries=5 # Override default retry limit
)
```
## Creating Directories when Saving Files
You can now specify if a task should create directories when saving its output to a file. This is particularly useful for organizing outputs and ensuring that file paths are correctly structured.
The `create_directory` parameter controls whether CrewAI should automatically create directories when saving task outputs to files. This feature is particularly useful for organizing outputs and ensuring that file paths are correctly structured, especially when working with complex project hierarchies.
### Default Behavior
By default, `create_directory=True`, which means CrewAI will automatically create any missing directories in the output file path:
```python Code
# ...
save_output_task = Task(
description='Save the summarized AI news to a file',
expected_output='File saved successfully',
agent=research_agent,
tools=[file_save_tool],
output_file='outputs/ai_news_summary.txt',
create_directory=True
# Default behavior - directories are created automatically
report_task = Task(
description='Generate a comprehensive market analysis report',
expected_output='A detailed market analysis with charts and insights',
agent=analyst_agent,
output_file='reports/2025/market_analysis.md', # Creates 'reports/2025/' if it doesn't exist
markdown=True
)
```
#...
### Disabling Directory Creation
If you want to prevent automatic directory creation and ensure that the directory already exists, set `create_directory=False`:
```python Code
# Strict mode - directory must already exist
strict_output_task = Task(
description='Save critical data that requires existing infrastructure',
expected_output='Data saved to pre-configured location',
agent=data_agent,
output_file='secure/vault/critical_data.json',
create_directory=False # Will raise RuntimeError if 'secure/vault/' doesn't exist
)
```
### YAML Configuration
You can also configure this behavior in your YAML task definitions:
```yaml tasks.yaml
analysis_task:
description: >
Generate quarterly financial analysis
expected_output: >
A comprehensive financial report with quarterly insights
agent: financial_analyst
output_file: reports/quarterly/q4_2024_analysis.pdf
create_directory: true # Automatically create 'reports/quarterly/' directory
audit_task:
description: >
Perform compliance audit and save to existing audit directory
expected_output: >
A compliance audit report
agent: auditor
output_file: audit/compliance_report.md
create_directory: false # Directory must already exist
```
### Use Cases
**Automatic Directory Creation (`create_directory=True`):**
- Development and prototyping environments
- Dynamic report generation with date-based folders
- Automated workflows where directory structure may vary
- Multi-tenant applications with user-specific folders
**Manual Directory Management (`create_directory=False`):**
- Production environments with strict file system controls
- Security-sensitive applications where directories must be pre-configured
- Systems with specific permission requirements
- Compliance environments where directory creation is audited
### Error Handling
When `create_directory=False` and the directory doesn't exist, CrewAI will raise a `RuntimeError`:
```python Code
try:
result = crew.kickoff()
except RuntimeError as e:
# Handle missing directory error
print(f"Directory creation failed: {e}")
# Create directory manually or use fallback location
```
Check out the video below to see how to use structured outputs in CrewAI:

View File

@@ -44,6 +44,19 @@ The `MCPServerAdapter` class from `crewai-tools` is the primary way to connect t
Using a Python context manager (`with` statement) is the **recommended approach** for `MCPServerAdapter`. It automatically handles starting and stopping the connection to the MCP server.
## Connection Configuration
The `MCPServerAdapter` supports several configuration options to customize the connection behavior:
- **`connect_timeout`** (optional): Maximum time in seconds to wait for establishing a connection to the MCP server. Defaults to 30 seconds if not specified. This is particularly useful for remote servers that may have variable response times.
```python
# Example with custom connection timeout
with MCPServerAdapter(server_params, connect_timeout=60) as tools:
# Connection will timeout after 60 seconds if not established
pass
```
```python
from crewai import Agent
from crewai_tools import MCPServerAdapter
@@ -70,7 +83,7 @@ server_params = {
}
# Example usage (uncomment and adapt once server_params is set):
with MCPServerAdapter(server_params) as mcp_tools:
with MCPServerAdapter(server_params, connect_timeout=60) as mcp_tools:
print(f"Available tools: {[tool.name for tool in mcp_tools]}")
my_agent = Agent(
@@ -95,7 +108,7 @@ There are two ways to filter tools:
### Accessing a specific tool using dictionary-style indexing.
```python
with MCPServerAdapter(server_params) as mcp_tools:
with MCPServerAdapter(server_params, connect_timeout=60) as mcp_tools:
print(f"Available tools: {[tool.name for tool in mcp_tools]}")
my_agent = Agent(
@@ -112,7 +125,7 @@ with MCPServerAdapter(server_params) as mcp_tools:
### Pass a list of tool names to the `MCPServerAdapter` constructor.
```python
with MCPServerAdapter(server_params, "tool_name") as mcp_tools:
with MCPServerAdapter(server_params, "tool_name", connect_timeout=60) as mcp_tools:
print(f"Available tools: {[tool.name for tool in mcp_tools]}")
my_agent = Agent(

View File

@@ -0,0 +1,286 @@
---
title: LangDB Integration
description: Govern, secure, and optimize your CrewAI workflows with LangDB AI Gateway—access 350+ models, automatic routing, cost optimization, and full observability.
icon: database
---
# Introduction
[LangDB AI Gateway](https://langdb.ai) provides OpenAI-compatible APIs to connect with multiple Large Language Models and serves as an observability platform that makes it effortless to trace CrewAI workflows end-to-end while providing access to 350+ language models. With a single `init()` call, all agent interactions, task executions, and LLM calls are captured, providing comprehensive observability and production-ready AI infrastructure for your applications.
<Frame caption="LangDB CrewAI Trace Example">
<img src="/images/langdb-1.png" alt="LangDB CrewAI trace example" />
</Frame>
**Checkout:** [View the live trace example](https://app.langdb.ai/sharing/threads/3becbfed-a1be-ae84-ea3c-4942867a3e22)
## Features
### AI Gateway Capabilities
- **Access to 350+ LLMs**: Connect to all major language models through a single integration
- **Virtual Models**: Create custom model configurations with specific parameters and routing rules
- **Virtual MCP**: Enable compatibility and integration with MCP (Model Context Protocol) systems for enhanced agent communication
- **Guardrails**: Implement safety measures and compliance controls for agent behavior
### Observability & Tracing
- **Automatic Tracing**: Single `init()` call captures all CrewAI interactions
- **End-to-End Visibility**: Monitor agent workflows from start to finish
- **Tool Usage Tracking**: Track which tools agents use and their outcomes
- **Model Call Monitoring**: Detailed insights into LLM interactions
- **Performance Analytics**: Monitor latency, token usage, and costs
- **Debugging Support**: Step-through execution for troubleshooting
- **Real-time Monitoring**: Live traces and metrics dashboard
## Setup Instructions
<Steps>
<Step title="Install LangDB">
Install the LangDB client with CrewAI feature flag:
```bash
pip install 'pylangdb[crewai]'
```
</Step>
<Step title="Set Environment Variables">
Configure your LangDB credentials:
```bash
export LANGDB_API_KEY="<your_langdb_api_key>"
export LANGDB_PROJECT_ID="<your_langdb_project_id>"
export LANGDB_API_BASE_URL='https://api.us-east-1.langdb.ai'
```
</Step>
<Step title="Initialize Tracing">
Import and initialize LangDB before configuring your CrewAI code:
```python
from pylangdb.crewai import init
# Initialize LangDB
init()
```
</Step>
<Step title="Configure CrewAI with LangDB">
Set up your LLM with LangDB headers:
```python
from crewai import Agent, Task, Crew, LLM
import os
# Configure LLM with LangDB headers
llm = LLM(
model="openai/gpt-4o", # Replace with the model you want to use
api_key=os.getenv("LANGDB_API_KEY"),
base_url=os.getenv("LANGDB_API_BASE_URL"),
extra_headers={"x-project-id": os.getenv("LANGDB_PROJECT_ID")}
)
```
</Step>
</Steps>
## Quick Start Example
Here's a simple example to get you started with LangDB and CrewAI:
```python
import os
from pylangdb.crewai import init
from crewai import Agent, Task, Crew, LLM
# Initialize LangDB before any CrewAI imports
init()
def create_llm(model):
return LLM(
model=model,
api_key=os.environ.get("LANGDB_API_KEY"),
base_url=os.environ.get("LANGDB_API_BASE_URL"),
extra_headers={"x-project-id": os.environ.get("LANGDB_PROJECT_ID")}
)
# Define your agent
researcher = Agent(
role="Research Specialist",
goal="Research topics thoroughly",
backstory="Expert researcher with skills in finding information",
llm=create_llm("openai/gpt-4o"), # Replace with the model you want to use
verbose=True
)
# Create a task
task = Task(
description="Research the given topic and provide a comprehensive summary",
agent=researcher,
expected_output="Detailed research summary with key findings"
)
# Create and run the crew
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
print(result)
```
## Complete Example: Research and Planning Agent
This comprehensive example demonstrates a multi-agent workflow with research and planning capabilities.
### Prerequisites
```bash
pip install crewai 'pylangdb[crewai]' crewai_tools setuptools python-dotenv
```
### Environment Setup
```bash
# LangDB credentials
export LANGDB_API_KEY="<your_langdb_api_key>"
export LANGDB_PROJECT_ID="<your_langdb_project_id>"
export LANGDB_API_BASE_URL='https://api.us-east-1.langdb.ai'
# Additional API keys (optional)
export SERPER_API_KEY="<your_serper_api_key>" # For web search capabilities
```
### Complete Implementation
```python
#!/usr/bin/env python3
import os
import sys
from pylangdb.crewai import init
init() # Initialize LangDB before any CrewAI imports
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process, LLM
from crewai_tools import SerperDevTool
load_dotenv()
def create_llm(model):
return LLM(
model=model,
api_key=os.environ.get("LANGDB_API_KEY"),
base_url=os.environ.get("LANGDB_API_BASE_URL"),
extra_headers={"x-project-id": os.environ.get("LANGDB_PROJECT_ID")}
)
class ResearchPlanningCrew:
def researcher(self) -> Agent:
return Agent(
role="Research Specialist",
goal="Research topics thoroughly and compile comprehensive information",
backstory="Expert researcher with skills in finding and analyzing information from various sources",
tools=[SerperDevTool()],
llm=create_llm("openai/gpt-4o"),
verbose=True
)
def planner(self) -> Agent:
return Agent(
role="Strategic Planner",
goal="Create actionable plans based on research findings",
backstory="Strategic planner who breaks down complex challenges into executable plans",
reasoning=True,
max_reasoning_attempts=3,
llm=create_llm("openai/anthropic/claude-3.7-sonnet"),
verbose=True
)
def research_task(self) -> Task:
return Task(
description="Research the topic thoroughly and compile comprehensive information",
agent=self.researcher(),
expected_output="Comprehensive research report with key findings and insights"
)
def planning_task(self) -> Task:
return Task(
description="Create a strategic plan based on the research findings",
agent=self.planner(),
expected_output="Strategic execution plan with phases, goals, and actionable steps",
context=[self.research_task()]
)
def crew(self) -> Crew:
return Crew(
agents=[self.researcher(), self.planner()],
tasks=[self.research_task(), self.planning_task()],
verbose=True,
process=Process.sequential
)
def main():
topic = sys.argv[1] if len(sys.argv) > 1 else "Artificial Intelligence in Healthcare"
crew_instance = ResearchPlanningCrew()
# Update task descriptions with the specific topic
crew_instance.research_task().description = f"Research {topic} thoroughly and compile comprehensive information"
crew_instance.planning_task().description = f"Create a strategic plan for {topic} based on the research findings"
result = crew_instance.crew().kickoff()
print(result)
if __name__ == "__main__":
main()
```
### Running the Example
```bash
python main.py "Sustainable Energy Solutions"
```
## Viewing Traces in LangDB
After running your CrewAI application, you can view detailed traces in the LangDB dashboard:
<Frame caption="LangDB Trace Dashboard">
<img src="/images/langdb-2.png" alt="LangDB trace dashboard showing CrewAI workflow" />
</Frame>
### What You'll See
- **Agent Interactions**: Complete flow of agent conversations and task handoffs
- **Tool Usage**: Which tools were called, their inputs, and outputs
- **Model Calls**: Detailed LLM interactions with prompts image.pngand responses
- **Performance Metrics**: Latency, token usage, and cost tracking
- **Execution Timeline**: Step-by-step view of the entire workflow
## Troubleshooting
### Common Issues
- **No traces appearing**: Ensure `init()` is called before any CrewAI imports
- **Authentication errors**: Verify your LangDB API key and project ID
## Resources
<CardGroup cols={3}>
<Card title="LangDB Documentation" icon="book" href="https://docs.langdb.ai">
Official LangDB documentation and guides
</Card>
<Card title="LangDB Guides" icon="graduation-cap" href="https://docs.langdb.ai/guides">
Step-by-step tutorials for building AI agents
</Card>
<Card title="GitHub Examples" icon="github" href="https://github.com/langdb/langdb-samples/tree/main/examples/crewai" >
Complete CrewAI integration examples
</Card>
<Card title="LangDB Dashboard" icon="chart-line" href="https://app.langdb.ai">
Access your traces and analytics
</Card>
<Card title="Model Catalog" icon="list" href="https://app.langdb.ai/models">
Browse 350+ available language models
</Card>
<Card title="Enterprise Features" icon="building" href="https://docs.langdb.ai/enterprise">
Self-hosted options and enterprise capabilities
</Card>
</CardGroup>
## Next Steps
This guide covered the basics of integrating LangDB AI Gateway with CrewAI. To further enhance your AI workflows, explore:
- **Virtual Models**: Create custom model configurations with routing strategies
- **Guardrails & Safety**: Implement content filtering and compliance controls
- **Production Deployment**: Configure fallbacks, retries, and load balancing
For more advanced features and use cases, visit the [LangDB Documentation](https://docs.langdb.ai) or explore the [Model Catalog](https://app.langdb.ai/models) to discover all available models.

View File

@@ -0,0 +1,134 @@
---
title: Neatlogs Integration
description: Understand, debug, and share your CrewAI agent runs
icon: magnifying-glass-chart
---
# Introduction
Neatlogs helps you **see what your agent did**, **why**, and **share it**.
It captures every step: thoughts, tool calls, responses, evaluations. No raw logs. Just clear, structured traces. Great for debugging and collaboration.
## Why use Neatlogs?
CrewAI agents use multiple tools and reasoning steps. When something goes wrong, you need context — not just errors.
Neatlogs lets you:
- Follow the full decision path
- Add feedback directly on steps
- Chat with the trace using AI assistant
- Share runs publicly for feedback
- Turn insights into tasks
All in one place.
Manage your traces effortlessly
![Traces](/images/neatlogs-1.png)
![Trace Response](/images/neatlogs-2.png)
The best UX to view a CrewAI trace. Post comments anywhere you want. Use AI to debug.
![Trace Details](/images/neatlogs-3.png)
![Ai Chat Bot With A Trace](/images/neatlogs-4.png)
![Comments Drawer](/images/neatlogs-5.png)
## Core Features
- **Trace Viewer**: Track thoughts, tools, and decisions in sequence
- **Inline Comments**: Tag teammates on any trace step
- **Feedback & Evaluation**: Mark outputs as correct or incorrect
- **Error Highlighting**: Automatic flagging of API/tool failures
- **Task Conversion**: Convert comments into assigned tasks
- **Ask the Trace (AI)**: Chat with your trace using Neatlogs AI bot
- **Public Sharing**: Publish trace links to your community
## Quick Setup with CrewAI
<Steps>
<Step title="Sign Up & Get API Key">
Visit [neatlogs.com](https://neatlogs.com/?utm_source=crewAI-docs), create a project, copy the API key.
</Step>
<Step title="Install SDK">
```bash
pip install neatlogs
```
(Latest version 0.8.0, Python 3.8+; MIT license)
</Step>
<Step title="Initialize Neatlogs">
Before starting Crew agents, add:
```python
import neatlogs
neatlogs.init("YOUR_PROJECT_API_KEY")
```
Agents run as usual. Neatlogs captures everything automatically.
</Step>
</Steps>
## Under the Hood
According to GitHub, Neatlogs:
- Captures thoughts, tool calls, responses, errors, and token stats
- Supports AI-powered task generation and robust evaluation workflows
All with just two lines of code.
## Watch It Work
### 🔍 Full Demo (4min)
<iframe
width="100%"
height="315"
src="https://www.youtube.com/embed/8KDme9T2I7Q?si=b8oHteaBwFNs_Duk"
title="YouTube video player"
frameBorder="0"
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
allowFullScreen
></iframe>
### ⚙️ CrewAI Integration (30s)
<iframe
className="w-full aspect-video rounded-xl"
src="https://www.loom.com/embed/9c78b552af43452bb3e4783cb8d91230?sid=e9d7d370-a91a-49b0-809e-2f375d9e801d"
title="Loom video player"
frameBorder="0"
allowFullScreen
></iframe>
## Links & Support
- 📘 [Neatlogs Docs](https://docs.neatlogs.com/)
- 🔐 [Dashboard & API Key](https://app.neatlogs.com/)
- 🐦 [Follow on Twitter](https://twitter.com/neatlogs)
- 📧 Contact: hello@neatlogs.com
- 🛠 [GitHub SDK](https://github.com/NeatLogs/neatlogs)
## TL;DR
With just:
```bash
pip install neatlogs
import neatlogs
neatlogs.init("YOUR_API_KEY")
You can now capture, understand, share, and act on your CrewAI agent runs in seconds.
No setup overhead. Full trace transparency. Full team collaboration.
```

View File

@@ -25,6 +25,10 @@ Observability is crucial for understanding how your CrewAI agents perform, ident
Session replays, metrics, and monitoring for agent development and production.
</Card>
<Card title="LangDB" icon="database" href="/en/observability/langdb">
End-to-end tracing for CrewAI workflows with automatic agent interaction capture.
</Card>
<Card title="OpenLIT" icon="magnifying-glass-chart" href="/en/observability/openlit">
OpenTelemetry-native monitoring with cost tracking and performance analytics.
</Card>

View File

@@ -44,6 +44,14 @@ These tools enable your agents to search the web, research topics, and find info
<Card title="YouTube Video Search" icon="play" href="/en/tools/search-research/youtubevideosearchtool">
Find and analyze YouTube videos by topic, keyword, or criteria.
</Card>
<Card title="Tavily Search Tool" icon="magnifying-glass" href="/en/tools/search-research/tavilysearchtool">
Comprehensive web search using Tavily's AI-powered search API.
</Card>
<Card title="Tavily Extractor Tool" icon="file-text" href="/en/tools/search-research/tavilyextractortool">
Extract structured content from web pages using the Tavily API.
</Card>
</CardGroup>
## **Common Use Cases**
@@ -55,17 +63,19 @@ These tools enable your agents to search the web, research topics, and find info
- **Academic Research**: Find scholarly articles and technical papers
```python
from crewai_tools import SerperDevTool, GitHubSearchTool, YoutubeVideoSearchTool
from crewai_tools import SerperDevTool, GitHubSearchTool, YoutubeVideoSearchTool, TavilySearchTool, TavilyExtractorTool
# Create research tools
web_search = SerperDevTool()
code_search = GitHubSearchTool()
video_research = YoutubeVideoSearchTool()
tavily_search = TavilySearchTool()
content_extractor = TavilyExtractorTool()
# Add to your agent
agent = Agent(
role="Research Analyst",
tools=[web_search, code_search, video_research],
tools=[web_search, code_search, video_research, tavily_search, content_extractor],
goal="Gather comprehensive information on any topic"
)
```

View File

@@ -6,10 +6,6 @@ icon: google
# `SerperDevTool`
<Note>
We are still working on improving tools, so there might be unexpected behavior or changes in the future.
</Note>
## Description
This tool is designed to perform a semantic search for a specified query from a text's content across the internet. It utilizes the [serper.dev](https://serper.dev) API
@@ -17,6 +13,12 @@ to fetch and display the most relevant search results based on the query provide
## Installation
To effectively use the `SerperDevTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a `serper.dev` API key by registering for a free account at `serper.dev`.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `SERPER_API_KEY` to facilitate its use by the tool.
To incorporate this tool into your project, follow the installation instructions below:
```shell
@@ -34,14 +36,6 @@ from crewai_tools import SerperDevTool
tool = SerperDevTool()
```
## Steps to Get Started
To effectively use the `SerperDevTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a `serper.dev` API key by registering for a free account at `serper.dev`.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `SERPER_API_KEY` to facilitate its use by the tool.
## Parameters
The `SerperDevTool` comes with several parameters that will be passed to the API :

View File

@@ -0,0 +1,139 @@
---
title: "Tavily Extractor Tool"
description: "Extract structured content from web pages using the Tavily API"
icon: "file-text"
---
The `TavilyExtractorTool` allows CrewAI agents to extract structured content from web pages using the Tavily API. It can process single URLs or lists of URLs and provides options for controlling the extraction depth and including images.
## Installation
To use the `TavilyExtractorTool`, you need to install the `tavily-python` library:
```shell
pip install 'crewai[tools]' tavily-python
```
You also need to set your Tavily API key as an environment variable:
```bash
export TAVILY_API_KEY='your-tavily-api-key'
```
## Example Usage
Here's how to initialize and use the `TavilyExtractorTool` within a CrewAI agent:
```python
import os
from crewai import Agent, Task, Crew
from crewai_tools import TavilyExtractorTool
# Ensure TAVILY_API_KEY is set in your environment
# os.environ["TAVILY_API_KEY"] = "YOUR_API_KEY"
# Initialize the tool
tavily_tool = TavilyExtractorTool()
# Create an agent that uses the tool
extractor_agent = Agent(
role='Web Content Extractor',
goal='Extract key information from specified web pages',
backstory='You are an expert at extracting relevant content from websites using the Tavily API.',
tools=[tavily_tool],
verbose=True
)
# Define a task for the agent
extract_task = Task(
description='Extract the main content from the URL https://example.com using basic extraction depth.',
expected_output='A JSON string containing the extracted content from the URL.',
agent=extractor_agent
)
# Create and run the crew
crew = Crew(
agents=[extractor_agent],
tasks=[extract_task],
verbose=2
)
result = crew.kickoff()
print(result)
```
## Configuration Options
The `TavilyExtractorTool` accepts the following arguments:
- `urls` (Union[List[str], str]): **Required**. A single URL string or a list of URL strings to extract data from.
- `include_images` (Optional[bool]): Whether to include images in the extraction results. Defaults to `False`.
- `extract_depth` (Literal["basic", "advanced"]): The depth of extraction. Use `"basic"` for faster, surface-level extraction or `"advanced"` for more comprehensive extraction. Defaults to `"basic"`.
- `timeout` (int): The maximum time in seconds to wait for the extraction request to complete. Defaults to `60`.
## Advanced Usage
### Multiple URLs with Advanced Extraction
```python
# Example with multiple URLs and advanced extraction
multi_extract_task = Task(
description='Extract content from https://example.com and https://anotherexample.org using advanced extraction.',
expected_output='A JSON string containing the extracted content from both URLs.',
agent=extractor_agent
)
# Configure the tool with custom parameters
custom_extractor = TavilyExtractorTool(
extract_depth='advanced',
include_images=True,
timeout=120
)
agent_with_custom_tool = Agent(
role="Advanced Content Extractor",
goal="Extract comprehensive content with images",
tools=[custom_extractor]
)
```
### Tool Parameters
You can customize the tool's behavior by setting parameters during initialization:
```python
# Initialize with custom configuration
extractor_tool = TavilyExtractorTool(
extract_depth='advanced', # More comprehensive extraction
include_images=True, # Include image results
timeout=90 # Custom timeout
)
```
## Features
- **Single or Multiple URLs**: Extract content from one URL or process multiple URLs in a single request
- **Configurable Depth**: Choose between basic (fast) and advanced (comprehensive) extraction modes
- **Image Support**: Optionally include images in the extraction results
- **Structured Output**: Returns well-formatted JSON containing the extracted content
- **Error Handling**: Robust handling of network timeouts and extraction errors
## Response Format
The tool returns a JSON string representing the structured data extracted from the provided URL(s). The exact structure depends on the content of the pages and the `extract_depth` used.
Common response elements include:
- **Title**: The page title
- **Content**: Main text content of the page
- **Images**: Image URLs and metadata (when `include_images=True`)
- **Metadata**: Additional page information like author, description, etc.
## Use Cases
- **Content Analysis**: Extract and analyze content from competitor websites
- **Research**: Gather structured data from multiple sources for analysis
- **Content Migration**: Extract content from existing websites for migration
- **Monitoring**: Regular extraction of content for change detection
- **Data Collection**: Systematic extraction of information from web sources
Refer to the [Tavily API documentation](https://docs.tavily.com/docs/tavily-api/python-sdk#extract) for detailed information about the response structure and available options.

View File

@@ -0,0 +1,122 @@
---
title: "Tavily Search Tool"
description: "Perform comprehensive web searches using the Tavily Search API"
icon: "magnifying-glass"
---
The `TavilySearchTool` provides an interface to the Tavily Search API, enabling CrewAI agents to perform comprehensive web searches. It allows for specifying search depth, topics, time ranges, included/excluded domains, and whether to include direct answers, raw content, or images in the results.
## Installation
To use the `TavilySearchTool`, you need to install the `tavily-python` library:
```shell
pip install 'crewai[tools]' tavily-python
```
## Environment Variables
Ensure your Tavily API key is set as an environment variable:
```bash
export TAVILY_API_KEY='your_tavily_api_key'
```
## Example Usage
Here's how to initialize and use the `TavilySearchTool` within a CrewAI agent:
```python
import os
from crewai import Agent, Task, Crew
from crewai_tools import TavilySearchTool
# Ensure the TAVILY_API_KEY environment variable is set
# os.environ["TAVILY_API_KEY"] = "YOUR_TAVILY_API_KEY"
# Initialize the tool
tavily_tool = TavilySearchTool()
# Create an agent that uses the tool
researcher = Agent(
role='Market Researcher',
goal='Find information about the latest AI trends',
backstory='An expert market researcher specializing in technology.',
tools=[tavily_tool],
verbose=True
)
# Create a task for the agent
research_task = Task(
description='Search for the top 3 AI trends in 2024.',
expected_output='A JSON report summarizing the top 3 AI trends found.',
agent=researcher
)
# Form the crew and kick it off
crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=2
)
result = crew.kickoff()
print(result)
```
## Configuration Options
The `TavilySearchTool` accepts the following arguments during initialization or when calling the `run` method:
- `query` (str): **Required**. The search query string.
- `search_depth` (Literal["basic", "advanced"], optional): The depth of the search. Defaults to `"basic"`.
- `topic` (Literal["general", "news", "finance"], optional): The topic to focus the search on. Defaults to `"general"`.
- `time_range` (Literal["day", "week", "month", "year"], optional): The time range for the search. Defaults to `None`.
- `days` (int, optional): The number of days to search back. Relevant if `time_range` is not set. Defaults to `7`.
- `max_results` (int, optional): The maximum number of search results to return. Defaults to `5`.
- `include_domains` (Sequence[str], optional): A list of domains to prioritize in the search. Defaults to `None`.
- `exclude_domains` (Sequence[str], optional): A list of domains to exclude from the search. Defaults to `None`.
- `include_answer` (Union[bool, Literal["basic", "advanced"]], optional): Whether to include a direct answer synthesized from the search results. Defaults to `False`.
- `include_raw_content` (bool, optional): Whether to include the raw HTML content of the searched pages. Defaults to `False`.
- `include_images` (bool, optional): Whether to include image results. Defaults to `False`.
- `timeout` (int, optional): The request timeout in seconds. Defaults to `60`.
## Advanced Usage
You can configure the tool with custom parameters:
```python
# Example: Initialize with specific parameters
custom_tavily_tool = TavilySearchTool(
search_depth='advanced',
max_results=10,
include_answer=True
)
# The agent will use these defaults
agent_with_custom_tool = Agent(
role="Advanced Researcher",
goal="Conduct detailed research with comprehensive results",
tools=[custom_tavily_tool]
)
```
## Features
- **Comprehensive Search**: Access to Tavily's powerful search index
- **Configurable Depth**: Choose between basic and advanced search modes
- **Topic Filtering**: Focus searches on general, news, or finance topics
- **Time Range Control**: Limit results to specific time periods
- **Domain Control**: Include or exclude specific domains
- **Direct Answers**: Get synthesized answers from search results
- **Content Filtering**: Prevent context window issues with automatic content truncation
## Response Format
The tool returns search results as a JSON string containing:
- Search results with titles, URLs, and content snippets
- Optional direct answers to queries
- Optional image results
- Optional raw HTML content (when enabled)
Content for each result is automatically truncated to prevent context window issues while maintaining the most relevant information.

View File

@@ -0,0 +1,100 @@
---
title: Serper Scrape Website
description: The `SerperScrapeWebsiteTool` is designed to scrape websites and extract clean, readable content using Serper's scraping API.
icon: globe
---
# `SerperScrapeWebsiteTool`
## Description
This tool is designed to scrape website content and extract clean, readable text from any website URL. It utilizes the [serper.dev](https://serper.dev) scraping API to fetch and process web pages, optionally including markdown formatting for better structure and readability.
## Installation
To effectively use the `SerperScrapeWebsiteTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a `serper.dev` API key by registering for an account at `serper.dev`.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `SERPER_API_KEY` to facilitate its use by the tool.
To incorporate this tool into your project, follow the installation instructions below:
```shell
pip install 'crewai[tools]'
```
## Example
The following example demonstrates how to initialize the tool and scrape a website:
```python Code
from crewai_tools import SerperScrapeWebsiteTool
# Initialize the tool for website scraping capabilities
tool = SerperScrapeWebsiteTool()
# Scrape a website with markdown formatting
result = tool.run(url="https://example.com", include_markdown=True)
```
## Arguments
The `SerperScrapeWebsiteTool` accepts the following arguments:
- **url**: Required. The URL of the website to scrape.
- **include_markdown**: Optional. Whether to include markdown formatting in the scraped content. Defaults to `True`.
## Example with Parameters
Here is an example demonstrating how to use the tool with different parameters:
```python Code
from crewai_tools import SerperScrapeWebsiteTool
tool = SerperScrapeWebsiteTool()
# Scrape with markdown formatting (default)
markdown_result = tool.run(
url="https://docs.crewai.com",
include_markdown=True
)
# Scrape without markdown formatting for plain text
plain_result = tool.run(
url="https://docs.crewai.com",
include_markdown=False
)
print("Markdown formatted content:")
print(markdown_result)
print("\nPlain text content:")
print(plain_result)
```
## Use Cases
The `SerperScrapeWebsiteTool` is particularly useful for:
- **Content Analysis**: Extract and analyze website content for research purposes
- **Data Collection**: Gather structured information from web pages
- **Documentation Processing**: Convert web-based documentation into readable formats
- **Competitive Analysis**: Scrape competitor websites for market research
- **Content Migration**: Extract content from existing websites for migration purposes
## Error Handling
The tool includes comprehensive error handling for:
- **Network Issues**: Handles connection timeouts and network errors gracefully
- **API Errors**: Provides detailed error messages for API-related issues
- **Invalid URLs**: Validates and reports issues with malformed URLs
- **Authentication**: Clear error messages for missing or invalid API keys
## Security Considerations
- Always store your `SERPER_API_KEY` in environment variables, never hardcode it in your source code
- Be mindful of rate limits imposed by the Serper API
- Respect robots.txt and website terms of service when scraping content
- Consider implementing delays between requests for large-scale scraping operations

BIN
docs/images/langdb-1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 127 KiB

BIN
docs/images/langdb-2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 117 KiB

BIN
docs/images/neatlogs-1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 222 KiB

BIN
docs/images/neatlogs-2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 329 KiB

BIN
docs/images/neatlogs-3.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 590 KiB

BIN
docs/images/neatlogs-4.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 216 KiB

BIN
docs/images/neatlogs-5.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 277 KiB

View File

@@ -76,6 +76,7 @@ Exemplo:
crewai train -n 10 -f my_training_data.pkl
```
```python
# Exemplo de uso programático do comando train
n_iterations = 2
inputs = {"topic": "Treinamento CrewAI"}
@@ -83,12 +84,13 @@ filename = "seu_modelo.pkl"
try:
SuaCrew().crew().train(
n_iterations=n_iterations,
inputs=inputs,
n_iterations=n_iterations,
inputs=inputs,
filename=filename
)
except Exception as e:
raise Exception(f"Ocorreu um erro ao treinar a crew: {e}")
```
### 4. Replay
@@ -101,7 +103,7 @@ crewai replay [OPTIONS]
- `-t, --task_id TEXT`: Reexecuta o crew a partir deste task ID, incluindo todas as tarefas subsequentes
Exemplo:
```shell Terminal
```shell Terminal
crewai replay -t task_123456
```
@@ -147,7 +149,7 @@ crewai test [OPTIONS]
- `-m, --model TEXT`: Modelo LLM para executar os testes no Crew (padrão: "gpt-4o-mini")
Exemplo:
```shell Terminal
```shell Terminal
crewai test -n 5 -m gpt-3.5-turbo
```
@@ -201,10 +203,7 @@ def crew(self) -> Crew:
Implemente o crew ou flow no [CrewAI Enterprise](https://app.crewai.com).
- **Autenticação**: Você precisa estar autenticado para implementar no CrewAI Enterprise.
```shell Terminal
crewai signup
```
Caso já tenha uma conta, você pode fazer login com:
Você pode fazer login ou criar uma conta com:
```shell Terminal
crewai login
```
@@ -251,7 +250,7 @@ Você deve estar autenticado no CrewAI Enterprise para usar estes comandos de ge
- **Implantar o Crew**: Depois de autenticado, você pode implantar seu crew ou flow no CrewAI Enterprise.
```shell Terminal
crewai deploy push
```
```
- Inicia o processo de deployment na plataforma CrewAI Enterprise.
- Após a iniciação bem-sucedida, será exibida a mensagem Deployment created successfully! juntamente com o Nome do Deployment e um Deployment ID (UUID) único.
@@ -324,4 +323,83 @@ Ao escolher um provedor, o CLI solicitará que você informe o nome da chave e a
Veja o seguinte link para o nome de chave de cada provedor:
* [LiteLLM Providers](https://docs.litellm.ai/docs/providers)
* [LiteLLM Providers](https://docs.litellm.ai/docs/providers)
### 12. Gerenciamento de Configuração
Gerencie as configurações do CLI para CrewAI.
```shell Terminal
crewai config [COMANDO] [OPÇÕES]
```
#### Comandos:
- `list`: Exibir todos os parâmetros de configuração do CLI
```shell Terminal
crewai config list
```
- `set`: Definir um parâmetro de configuração do CLI
```shell Terminal
crewai config set <chave> <valor>
```
- `reset`: Redefinir todos os parâmetros de configuração do CLI para valores padrão
```shell Terminal
crewai config reset
```
#### Parâmetros de Configuração Disponíveis
- `enterprise_base_url`: URL base da instância CrewAI Enterprise
- `oauth2_provider`: Provedor OAuth2 usado para autenticação (ex: workos, okta, auth0)
- `oauth2_audience`: Valor de audiência OAuth2, tipicamente usado para identificar a API ou recurso de destino
- `oauth2_client_id`: ID do cliente OAuth2 emitido pelo provedor, usado durante solicitações de autenticação
- `oauth2_domain`: Domínio do provedor OAuth2 (ex: sua-org.auth0.com) usado para emissão de tokens
#### Exemplos
Exibir configuração atual:
```shell Terminal
crewai config list
```
Exemplo de saída:
```
CrewAI CLI Configuration
┏━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Setting ┃ Value ┃ Description ┃
┡━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ enterprise_base_url│ https://app.crewai.com │ Base URL of the CrewAI Enterprise instance │
│ org_name │ Not set │ Name of the currently active organization │
│ org_uuid │ Not set │ UUID of the currently active organization │
│ oauth2_provider │ workos │ OAuth2 provider used for authentication (e.g., workos, okta, auth0). │
│ oauth2_audience │ client_01YYY │ OAuth2 audience value, typically used to identify the target API or resource. │
│ oauth2_client_id │ client_01XXX │ OAuth2 client ID issued by the provider, used during authentication requests. │
│ oauth2_domain │ login.crewai.com │ OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens. │
```
Definir a URL base do enterprise:
```shell Terminal
crewai config set enterprise_base_url https://minha-empresa.crewai.com
```
Definir provedor OAuth2:
```shell Terminal
crewai config set oauth2_provider auth0
```
Definir domínio OAuth2:
```shell Terminal
crewai config set oauth2_domain minha-empresa.auth0.com
```
Redefinir todas as configurações para padrões:
```shell Terminal
crewai config reset
```
<Note>
As configurações são armazenadas em `~/.config/crewai/settings.json`. Algumas configurações como nome da organização e UUID são somente leitura e gerenciadas através de comandos de autenticação e organização. Configurações relacionadas ao repositório de ferramentas são ocultas e não podem ser definidas diretamente pelo usuário.
</Note>

View File

@@ -20,8 +20,7 @@ Uma crew no crewAI representa um grupo colaborativo de agentes trabalhando em co
| **Function Calling LLM** _(opcional)_ | `function_calling_llm` | Se definido, a crew utilizará este LLM para invocar funções das ferramentas para todos os agentes da crew. Cada agente pode ter seu próprio LLM, que substitui o LLM da crew para chamadas de função. |
| **Config** _(opcional)_ | `config` | Configurações opcionais para a crew, no formato `Json` ou `Dict[str, Any]`. |
| **Max RPM** _(opcional)_ | `max_rpm` | Número máximo de requisições por minuto que a crew respeita durante a execução. O padrão é `None`. |
| **Memory** _(opcional)_ | `memory` | Utilizada para armazenar memórias de execução (curto prazo, longo prazo, memória de entidade). |
| **Memory Config** _(opcional)_ | `memory_config` | Configuração para o provedor de memória a ser utilizada pela crew. |
| **Memory** _(opcional)_ | `memory` | Utilizada para armazenar memórias de execução (curto prazo, longo prazo, memória de entidade). | |
| **Cache** _(opcional)_ | `cache` | Especifica se deve usar cache para armazenar os resultados da execução de ferramentas. O padrão é `True`. |
| **Embedder** _(opcional)_ | `embedder` | Configuração do embedder a ser utilizado pela crew. Atualmente mais usado por memory. O padrão é `{"provider": "openai"}`. |
| **Step Callback** _(opcional)_ | `step_callback` | Uma função chamada após cada etapa de cada agente. Pode ser usada para registrar as ações do agente ou executar outras operações; não sobrescreve o `step_callback` específico do agente. |

View File

@@ -268,7 +268,7 @@ Nesta seção, você encontrará exemplos detalhados que ajudam a selecionar, co
from crewai import LLM
llm = LLM(
model="gemini/gemini-1.5-pro-latest",
model="gemini-1.5-pro-latest", # or vertex_ai/gemini-1.5-pro-latest
temperature=0.7,
vertex_credentials=vertex_credentials_json
)

View File

@@ -9,8 +9,7 @@ icon: database
O framework CrewAI oferece um sistema de memória sofisticado projetado para aprimorar significativamente as capacidades dos agentes de IA. O CrewAI disponibiliza **três abordagens distintas de memória** que atendem a diferentes casos de uso:
1. **Sistema Básico de Memória** - Memória de curto prazo, longo prazo e de entidades integradas
2. **Memória de Usuário** - Memória específica do usuário com integração ao Mem0 (abordagem legada)
3. **Memória Externa** - Provedores de memória externos autônomos (nova abordagem)
2. **Memória Externa** - Provedores de memória externos autônomos
## Componentes do Sistema de Memória
@@ -19,7 +18,7 @@ O framework CrewAI oferece um sistema de memória sofisticado projetado para apr
| **Memória de Curto Prazo** | Armazena temporariamente interações e resultados recentes usando `RAG`, permitindo que os agentes recordem e utilizem informações relevantes ao contexto atual durante as execuções. |
| **Memória de Longo Prazo** | Preserva informações valiosas e aprendizados de execuções passadas, permitindo que os agentes construam e refinem seu conhecimento ao longo do tempo. |
| **Memória de Entidades** | Captura e organiza informações sobre entidades (pessoas, lugares, conceitos) encontradas durante tarefas, facilitando um entendimento mais profundo e o mapeamento de relacionamentos. Utiliza `RAG` para armazenar informações de entidades. |
| **Memória Contextual** | Mantém o contexto das interações combinando `ShortTermMemory`, `LongTermMemory` e `EntityMemory`, auxiliando na coerência e relevância das respostas dos agentes ao longo de uma sequência de tarefas ou conversas. |
| **Memória Contextual** | Mantém o contexto das interações combinando `ShortTermMemory`, `LongTermMemory` , `ExternalMemory` e `EntityMemory`, auxiliando na coerência e relevância das respostas dos agentes ao longo de uma sequência de tarefas ou conversas. |
## 1. Sistema Básico de Memória (Recomendado)
@@ -623,7 +622,7 @@ for provider in providers_to_test:
**Erros de modelo não encontrado:**
```python
# Verifique disponibilidade do modelo
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
try:
@@ -684,67 +683,19 @@ print(f"OpenAI: {openai_time:.2f}s")
print(f"Ollama: {ollama_time:.2f}s")
```
## 2. Memória de Usuário com Mem0 (Legado)
## 2. Memória Externa
<Warning>
**Abordagem Legada**: Embora totalmente funcional, esta abordagem é considerada legada. Para novos projetos que exijam memória específica do usuário, considere usar Memória Externa.
</Warning>
A Memória Externa fornece um sistema de memória autônomo que opera independentemente da memória interna da crew. Isso é ideal para provedores de memória especializados ou compartilhamento de memória entre aplicações.
A Memória de Usuário se integra com o [Mem0](https://mem0.ai/) para fornecer memória específica do usuário que persiste entre sessões e se integra ao sistema de memória contextual da crew.
### Pré-requisitos
```bash
pip install mem0ai
```
### Configuração Mem0 na Nuvem
### Memória Externa Básica com Mem0
```python
import os
from crewai import Crew, Process
from crewai import Agent, Crew, Process, Task
from crewai.memory.external.external_memory import ExternalMemory
# Defina sua chave de API do Mem0
os.environ["MEM0_API_KEY"] = "m0-your-api-key"
crew = Crew(
agents=[...],
tasks=[...],
memory=True, # Necessário para integração com a memória contextual
memory_config={
"provider": "mem0",
"config": {"user_id": "john"},
"user_memory": {} # Obrigatório - inicializa a memória de usuário
},
process=Process.sequential,
verbose=True
)
```
### Configuração Avançada Mem0
```python
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
memory_config={
"provider": "mem0",
"config": {
"user_id": "john",
"org_id": "my_org_id", # Opcional
"project_id": "my_project_id", # Opcional
"api_key": "custom-api-key" # Opcional - sobrescreve variável de ambiente
},
"user_memory": {}
}
)
```
### Configuração Mem0 Local
```python
crew = Crew(
agents=[...],
tasks=[...],
memory=True,
memory_config={
# Create external memory instance with local Mem0 Configuration
external_memory = ExternalMemory(
embedder_config={
"provider": "mem0",
"config": {
"user_id": "john",
@@ -761,37 +712,60 @@ crew = Crew(
"provider": "openai",
"config": {"api_key": "your-api-key", "model": "text-embedding-3-small"}
}
}
},
"infer": True # Optional defaults to True
},
"user_memory": {}
}
)
```
## 3. Memória Externa (Nova Abordagem)
A Memória Externa fornece um sistema de memória autônomo que opera independentemente da memória interna da crew. Isso é ideal para provedores de memória especializados ou compartilhamento de memória entre aplicações.
### Memória Externa Básica com Mem0
```python
import os
from crewai import Agent, Crew, Process, Task
from crewai.memory.external.external_memory import ExternalMemory
os.environ["MEM0_API_KEY"] = "your-api-key"
# Criar instância de memória externa
external_memory = ExternalMemory(
embedder_config={
"provider": "mem0",
"config": {"user_id": "U-123"}
}
)
crew = Crew(
agents=[...],
tasks=[...],
external_memory=external_memory, # Independente da memória básica
external_memory=external_memory, # Separate from basic memory
process=Process.sequential,
verbose=True
)
```
### Memória Externa Avançada com o Cliente Mem0
Ao usar o Cliente Mem0, você pode personalizar ainda mais a configuração de memória usando parâmetros como "includes", "excludes", "custom_categories", "infer" e "run_id" (apenas para memória de curto prazo).
Você pode encontrar mais detalhes na [documentação do Mem0](https://docs.mem0.ai/).
```python
import os
from crewai import Agent, Crew, Process, Task
from crewai.memory.external.external_memory import ExternalMemory
new_categories = [
{"lifestyle_management_concerns": "Tracks daily routines, habits, hobbies and interests including cooking, time management and work-life balance"},
{"seeking_structure": "Documents goals around creating routines, schedules, and organized systems in various life areas"},
{"personal_information": "Basic information about the user including name, preferences, and personality traits"}
]
os.environ["MEM0_API_KEY"] = "your-api-key"
# Create external memory instance with Mem0 Client
external_memory = ExternalMemory(
embedder_config={
"provider": "mem0",
"config": {
"user_id": "john",
"org_id": "my_org_id", # Optional
"project_id": "my_project_id", # Optional
"api_key": "custom-api-key" # Optional - overrides env var
"run_id": "my_run_id", # Optional - for short-term memory
"includes": "include1", # Optional
"excludes": "exclude1", # Optional
"infer": True # Optional defaults to True
"custom_categories": new_categories # Optional - custom categories for user memory
},
}
)
crew = Crew(
agents=[...],
tasks=[...],
external_memory=external_memory, # Separate from basic memory
process=Process.sequential,
verbose=True
)
@@ -830,17 +804,18 @@ crew = Crew(
)
```
## Comparação dos Sistemas de Memória
## 🧠 Comparação dos Sistemas de Memória
| **Categoria** | **Recurso** | **Memória Básica** | **Memória Externa** |
|------------------------|-------------------------------|-------------------------------|----------------------------------|
| **Facilidade de Uso** | Complexidade de Setup | Simples | Média |
| | Integração | Contextual integrada | Autônoma |
| **Persistência** | Armazenamento | Arquivos locais | Customizada / Mem0 |
| | Multi-sessão | ✅ | ✅ |
| **Personalização** | Especificidade do Usuário | ❌ | ✅ |
| | Provedores Customizados | Limitado | Qualquer provedor |
| **Aplicação Recomendada** | Recomendado para | Maioria dos casos | Necessidades especializadas |
| Recurso | Memória Básica | Memória de Usuário (Legado) | Memória Externa |
|---------|---------------|-----------------------------|----------------|
| **Complexidade de Setup** | Simples | Média | Média |
| **Integração** | Contextual integrada | Contextual + específica do usuário | Autônoma |
| **Armazenamento** | Arquivos locais | Mem0 Cloud/Local | Customizada/Mem0 |
| **Multi-sessão** | ✅ | ✅ | ✅ |
| **Especificidade do Usuário** | ❌ | ✅ | ✅ |
| **Provedores Customizados** | Limitado | Apenas Mem0 | Qualquer provedor |
| **Recomendado para** | Maioria dos casos | Projetos legados | Necessidades especializadas |
## Provedores de Embedding Suportados
@@ -989,4 +964,4 @@ crew = Crew(
## Conclusão
Integrar o sistema de memória do CrewAI em seus projetos é simples. Ao aproveitar os componentes e configurações oferecidos,
você rapidamente capacita seus agentes a lembrar, raciocinar e aprender com suas interações, desbloqueando novos níveis de inteligência e capacidade.
você rapidamente capacita seus agentes a lembrar, raciocinar e aprender com suas interações, desbloqueando novos níveis de inteligência e capacidade.

View File

@@ -54,9 +54,11 @@ crew = Crew(
| **Markdown** _(opcional)_ | `markdown` | `Optional[bool]` | Se a tarefa deve instruir o agente a retornar a resposta final formatada em Markdown. O padrão é False. |
| **Config** _(opcional)_ | `config` | `Optional[Dict[str, Any]]` | Parâmetros de configuração específicos da tarefa. |
| **Arquivo de Saída** _(opcional)_| `output_file` | `Optional[str]` | Caminho do arquivo para armazenar a saída da tarefa. |
| **Criar Diretório** _(opcional)_ | `create_directory` | `Optional[bool]` | Se deve criar o diretório para output_file caso não exista. O padrão é True. |
| **Saída JSON** _(opcional)_ | `output_json` | `Optional[Type[BaseModel]]` | Um modelo Pydantic para estruturar a saída em JSON. |
| **Output Pydantic** _(opcional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | Um modelo Pydantic para a saída da tarefa. |
| **Callback** _(opcional)_ | `callback` | `Optional[Any]` | Função/objeto a ser executado após a conclusão da tarefa. |
| **Guardrail** _(opcional)_ | `guardrail` | `Optional[Callable]` | Função para validar a saída da tarefa antes de prosseguir para a próxima tarefa. |
## Criando Tarefas
@@ -330,9 +332,11 @@ analysis_task = Task(
Guardrails (trilhas de proteção) de tarefas fornecem uma maneira de validar e transformar as saídas das tarefas antes que elas sejam passadas para a próxima tarefa. Esse recurso assegura a qualidade dos dados e oferece feedback aos agentes quando sua saída não atende a critérios específicos.
### Usando Guardrails em Tarefas
Guardrails são implementados como funções Python que contêm lógica de validação customizada, proporcionando controle total sobre o processo de validação e garantindo resultados confiáveis e determinísticos.
Para adicionar um guardrail a uma tarefa, forneça uma função de validação por meio do parâmetro `guardrail`:
### Guardrails Baseados em Função
Para adicionar um guardrail baseado em função a uma tarefa, forneça uma função de validação por meio do parâmetro `guardrail`:
```python Code
from typing import Tuple, Union, Dict, Any
@@ -370,9 +374,7 @@ blog_task = Task(
- Em caso de sucesso: retorna uma tupla `(True, resultado_validado)`
- Em caso de falha: retorna uma tupla `(False, "mensagem de erro explicando a falha")`
### LLMGuardrail
A classe `LLMGuardrail` oferece um mecanismo robusto para validação das saídas das tarefas.
### Melhores Práticas de Tratamento de Erros
@@ -823,26 +825,7 @@ task = Task(
)
```
#### Use uma abordagem no-code para validação
```python Code
from crewai import Task
task = Task(
description="Gerar dados em JSON",
expected_output="Objeto JSON válido",
guardrail="Garanta que a resposta é um objeto JSON válido"
)
```
#### Usando YAML
```yaml
research_task:
...
guardrail: garanta que cada bullet tenha no mínimo 100 palavras
...
```
```python Code
@CrewBase
@@ -958,21 +941,87 @@ task = Task(
## Criando Diretórios ao Salvar Arquivos
Agora é possível especificar se uma tarefa deve criar diretórios ao salvar sua saída em arquivo. Isso é útil para organizar outputs e garantir que os caminhos estejam corretos.
O parâmetro `create_directory` controla se o CrewAI deve criar automaticamente diretórios ao salvar saídas de tarefas em arquivos. Este recurso é particularmente útil para organizar outputs e garantir que os caminhos de arquivos estejam estruturados corretamente, especialmente ao trabalhar com hierarquias de projetos complexas.
### Comportamento Padrão
Por padrão, `create_directory=True`, o que significa que o CrewAI criará automaticamente qualquer diretório ausente no caminho do arquivo de saída:
```python Code
# ...
save_output_task = Task(
description='Salve o resumo das notícias de IA em um arquivo',
expected_output='Arquivo salvo com sucesso',
agent=research_agent,
tools=[file_save_tool],
output_file='outputs/ai_news_summary.txt',
create_directory=True
# Comportamento padrão - diretórios são criados automaticamente
report_task = Task(
description='Gerar um relatório abrangente de análise de mercado',
expected_output='Uma análise detalhada de mercado com gráficos e insights',
agent=analyst_agent,
output_file='reports/2025/market_analysis.md', # Cria 'reports/2025/' se não existir
markdown=True
)
```
#...
### Desabilitando a Criação de Diretórios
Se você quiser evitar a criação automática de diretórios e garantir que o diretório já exista, defina `create_directory=False`:
```python Code
# Modo estrito - o diretório já deve existir
strict_output_task = Task(
description='Salvar dados críticos que requerem infraestrutura existente',
expected_output='Dados salvos em localização pré-configurada',
agent=data_agent,
output_file='secure/vault/critical_data.json',
create_directory=False # Gerará RuntimeError se 'secure/vault/' não existir
)
```
### Configuração YAML
Você também pode configurar este comportamento em suas definições de tarefas YAML:
```yaml tasks.yaml
analysis_task:
description: >
Gerar análise financeira trimestral
expected_output: >
Um relatório financeiro abrangente com insights trimestrais
agent: financial_analyst
output_file: reports/quarterly/q4_2024_analysis.pdf
create_directory: true # Criar automaticamente o diretório 'reports/quarterly/'
audit_task:
description: >
Realizar auditoria de conformidade e salvar no diretório de auditoria existente
expected_output: >
Um relatório de auditoria de conformidade
agent: auditor
output_file: audit/compliance_report.md
create_directory: false # O diretório já deve existir
```
### Casos de Uso
**Criação Automática de Diretórios (`create_directory=True`):**
- Ambientes de desenvolvimento e prototipagem
- Geração dinâmica de relatórios com pastas baseadas em datas
- Fluxos de trabalho automatizados onde a estrutura de diretórios pode variar
- Aplicações multi-tenant com pastas específicas do usuário
**Gerenciamento Manual de Diretórios (`create_directory=False`):**
- Ambientes de produção com controles rígidos do sistema de arquivos
- Aplicações sensíveis à segurança onde diretórios devem ser pré-configurados
- Sistemas com requisitos específicos de permissão
- Ambientes de conformidade onde a criação de diretórios é auditada
### Tratamento de Erros
Quando `create_directory=False` e o diretório não existe, o CrewAI gerará um `RuntimeError`:
```python Code
try:
result = crew.kickoff()
except RuntimeError as e:
# Tratar erro de diretório ausente
print(f"Falha na criação do diretório: {e}")
# Criar diretório manualmente ou usar local alternativo
```
Veja o vídeo abaixo para aprender como utilizar saídas estruturadas no CrewAI:

View File

@@ -44,6 +44,19 @@ A classe `MCPServerAdapter` da `crewai-tools` é a principal forma de conectar-s
O uso de um gerenciador de contexto Python (`with`) é a **abordagem recomendada** para o `MCPServerAdapter`. Ele lida automaticamente com a abertura e o fechamento da conexão com o servidor MCP.
## Configuração de Conexão
O `MCPServerAdapter` suporta várias opções de configuração para personalizar o comportamento da conexão:
- **`connect_timeout`** (opcional): Tempo máximo em segundos para aguardar o estabelecimento de uma conexão com o servidor MCP. O padrão é 30 segundos se não especificado. Isso é particularmente útil para servidores remotos que podem ter tempos de resposta variáveis.
```python
# Exemplo com timeout personalizado para conexão
with MCPServerAdapter(server_params, connect_timeout=60) as tools:
# A conexão terá timeout após 60 segundos se não estabelecida
pass
```
```python
from crewai import Agent
from crewai_tools import MCPServerAdapter
@@ -70,7 +83,7 @@ server_params = {
}
# Exemplo de uso (descomente e adapte após definir server_params):
with MCPServerAdapter(server_params) as mcp_tools:
with MCPServerAdapter(server_params, connect_timeout=60) as mcp_tools:
print(f"Available tools: {[tool.name for tool in mcp_tools]}")
meu_agente = Agent(
@@ -88,7 +101,7 @@ Este padrão geral mostra como integrar ferramentas. Para exemplos específicos
## Filtrando Ferramentas
```python
with MCPServerAdapter(server_params) as mcp_tools:
with MCPServerAdapter(server_params, connect_timeout=60) as mcp_tools:
print(f"Available tools: {[tool.name for tool in mcp_tools]}")
meu_agente = Agent(

View File

@@ -0,0 +1,286 @@
---
title: Integração LangDB
description: Governe, proteja e otimize seus fluxos de trabalho CrewAI com LangDB AI Gateway—acesse mais de 350 modelos, roteamento automático, otimização de custos e observabilidade completa.
icon: database
---
# Introdução
[LangDB AI Gateway](https://langdb.ai) fornece APIs compatíveis com OpenAI para conectar com múltiplos Modelos de Linguagem Grandes e serve como uma plataforma de observabilidade que torna effortless rastrear fluxos de trabalho CrewAI de ponta a ponta, proporcionando acesso a mais de 350 modelos de linguagem. Com uma única chamada `init()`, todas as interações de agentes, execuções de tarefas e chamadas LLM são capturadas, fornecendo observabilidade abrangente e infraestrutura de IA pronta para produção para suas aplicações.
<Frame caption="Exemplo de Rastreamento CrewAI LangDB">
<img src="/images/langdb-1.png" alt="Exemplo de rastreamento CrewAI LangDB" />
</Frame>
**Confira:** [Ver o exemplo de trace ao vivo](https://app.langdb.ai/sharing/threads/3becbfed-a1be-ae84-ea3c-4942867a3e22)
## Recursos
### Capacidades do AI Gateway
- **Acesso a mais de 350 LLMs**: Conecte-se a todos os principais modelos de linguagem através de uma única integração
- **Modelos Virtuais**: Crie configurações de modelo personalizadas com parâmetros específicos e regras de roteamento
- **MCP Virtual**: Habilite compatibilidade e integração com sistemas MCP (Model Context Protocol) para comunicação aprimorada de agentes
- **Guardrails**: Implemente medidas de segurança e controles de conformidade para comportamento de agentes
### Observabilidade e Rastreamento
- **Rastreamento Automático**: Uma única chamada `init()` captura todas as interações CrewAI
- **Visibilidade Ponta a Ponta**: Monitore fluxos de trabalho de agentes do início ao fim
- **Rastreamento de Uso de Ferramentas**: Rastreie quais ferramentas os agentes usam e seus resultados
- **Monitoramento de Chamadas de Modelo**: Insights detalhados sobre interações LLM
- **Análise de Performance**: Monitore latência, uso de tokens e custos
- **Suporte a Depuração**: Execução passo a passo para solução de problemas
- **Monitoramento em Tempo Real**: Dashboard de traces e métricas ao vivo
## Instruções de Configuração
<Steps>
<Step title="Instalar LangDB">
Instale o cliente LangDB com flag de recurso CrewAI:
```bash
pip install 'pylangdb[crewai]'
```
</Step>
<Step title="Definir Variáveis de Ambiente">
Configure suas credenciais LangDB:
```bash
export LANGDB_API_KEY="<sua_chave_api_langdb>"
export LANGDB_PROJECT_ID="<seu_id_projeto_langdb>"
export LANGDB_API_BASE_URL='https://api.us-east-1.langdb.ai'
```
</Step>
<Step title="Inicializar Rastreamento">
Importe e inicialize LangDB antes de configurar seu código CrewAI:
```python
from pylangdb.crewai import init
# Inicializar LangDB
init()
```
</Step>
<Step title="Configurar CrewAI com LangDB">
Configure seu LLM com cabeçalhos LangDB:
```python
from crewai import Agent, Task, Crew, LLM
import os
# Configurar LLM com cabeçalhos LangDB
llm = LLM(
model="openai/gpt-4o", # Substitua pelo modelo que você quer usar
api_key=os.getenv("LANGDB_API_KEY"),
base_url=os.getenv("LANGDB_API_BASE_URL"),
extra_headers={"x-project-id": os.getenv("LANGDB_PROJECT_ID")}
)
```
</Step>
</Steps>
## Exemplo de Início Rápido
Aqui está um exemplo simples para começar com LangDB e CrewAI:
```python
import os
from pylangdb.crewai import init
from crewai import Agent, Task, Crew, LLM
# Inicializar LangDB antes de qualquer importação CrewAI
init()
def create_llm(model):
return LLM(
model=model,
api_key=os.environ.get("LANGDB_API_KEY"),
base_url=os.environ.get("LANGDB_API_BASE_URL"),
extra_headers={"x-project-id": os.environ.get("LANGDB_PROJECT_ID")}
)
# Defina seu agente
researcher = Agent(
role="Especialista em Pesquisa",
goal="Pesquisar tópicos minuciosamente",
backstory="Pesquisador especialista com habilidades em encontrar informações",
llm=create_llm("openai/gpt-4o"), # Substitua pelo modelo que você quer usar
verbose=True
)
# Criar uma tarefa
task = Task(
description="Pesquise o tópico dado e forneça um resumo abrangente",
agent=researcher,
expected_output="Resumo de pesquisa detalhado com principais descobertas"
)
# Criar e executar a equipe
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
print(result)
```
## Exemplo Completo: Agente de Pesquisa e Planejamento
Este exemplo abrangente demonstra um fluxo de trabalho multi-agente com capacidades de pesquisa e planejamento.
### Pré-requisitos
```bash
pip install crewai 'pylangdb[crewai]' crewai_tools setuptools python-dotenv
```
### Configuração do Ambiente
```bash
# Credenciais LangDB
export LANGDB_API_KEY="<sua_chave_api_langdb>"
export LANGDB_PROJECT_ID="<seu_id_projeto_langdb>"
export LANGDB_API_BASE_URL='https://api.us-east-1.langdb.ai'
# Chaves API adicionais (opcional)
export SERPER_API_KEY="<sua_chave_api_serper>" # Para capacidades de busca na web
```
### Implementação Completa
```python
#!/usr/bin/env python3
import os
import sys
from pylangdb.crewai import init
init() # Inicializar LangDB antes de qualquer importação CrewAI
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process, LLM
from crewai_tools import SerperDevTool
load_dotenv()
def create_llm(model):
return LLM(
model=model,
api_key=os.environ.get("LANGDB_API_KEY"),
base_url=os.environ.get("LANGDB_API_BASE_URL"),
extra_headers={"x-project-id": os.environ.get("LANGDB_PROJECT_ID")}
)
class ResearchPlanningCrew:
def researcher(self) -> Agent:
return Agent(
role="Especialista em Pesquisa",
goal="Pesquisar tópicos minuciosamente e compilar informações abrangentes",
backstory="Pesquisador especialista com habilidades em encontrar e analisar informações de várias fontes",
tools=[SerperDevTool()],
llm=create_llm("openai/gpt-4o"),
verbose=True
)
def planner(self) -> Agent:
return Agent(
role="Planejador Estratégico",
goal="Criar planos acionáveis baseados em descobertas de pesquisa",
backstory="Planejador estratégico que divide desafios complexos em planos executáveis",
reasoning=True,
max_reasoning_attempts=3,
llm=create_llm("openai/anthropic/claude-3.7-sonnet"),
verbose=True
)
def research_task(self) -> Task:
return Task(
description="Pesquise o tópico minuciosamente e compile informações abrangentes",
agent=self.researcher(),
expected_output="Relatório de pesquisa abrangente com principais descobertas e insights"
)
def planning_task(self) -> Task:
return Task(
description="Crie um plano estratégico baseado nas descobertas da pesquisa",
agent=self.planner(),
expected_output="Plano de execução estratégica com fases, objetivos e etapas acionáveis",
context=[self.research_task()]
)
def crew(self) -> Crew:
return Crew(
agents=[self.researcher(), self.planner()],
tasks=[self.research_task(), self.planning_task()],
verbose=True,
process=Process.sequential
)
def main():
topic = sys.argv[1] if len(sys.argv) > 1 else "Inteligência Artificial na Saúde"
crew_instance = ResearchPlanningCrew()
# Atualizar descrições de tarefas com o tópico específico
crew_instance.research_task().description = f"Pesquise {topic} minuciosamente e compile informações abrangentes"
crew_instance.planning_task().description = f"Crie um plano estratégico para {topic} baseado nas descobertas da pesquisa"
result = crew_instance.crew().kickoff()
print(result)
if __name__ == "__main__":
main()
```
### Executando o Exemplo
```bash
python main.py "Soluções de Energia Sustentável"
```
## Visualizando Traces no LangDB
Após executar sua aplicação CrewAI, você pode visualizar traces detalhados no dashboard LangDB:
<Frame caption="Dashboard de Trace LangDB">
<img src="/images/langdb-2.png" alt="Dashboard de trace LangDB mostrando fluxo de trabalho CrewAI" />
</Frame>
### O Que Você Verá
- **Interações de Agentes**: Fluxo completo de conversas de agentes e transferências de tarefas
- **Uso de Ferramentas**: Quais ferramentas foram chamadas, suas entradas e saídas
- **Chamadas de Modelo**: Interações LLM detalhadas com prompts e respostas
- **Métricas de Performance**: Rastreamento de latência, uso de tokens e custos
- **Linha do Tempo de Execução**: Visualização passo a passo de todo o fluxo de trabalho
## Solução de Problemas
### Problemas Comuns
- **Nenhum trace aparecendo**: Certifique-se de que `init()` seja chamado antes de qualquer importação CrewAI
- **Erros de autenticação**: Verifique sua chave API LangDB e ID do projeto
## Recursos
<CardGroup cols={3}>
<Card title="Documentação LangDB" icon="book" href="https://docs.langdb.ai">
Documentação oficial e guias LangDB
</Card>
<Card title="Guias LangDB" icon="graduation-cap" href="https://docs.langdb.ai/guides">
Tutoriais passo a passo para construir agentes de IA
</Card>
<Card title="Exemplos GitHub" icon="github" href="https://github.com/langdb/langdb-samples/tree/main/examples/crewai" >
Exemplos completos de integração CrewAI
</Card>
<Card title="Dashboard LangDB" icon="chart-line" href="https://app.langdb.ai">
Acesse seus traces e análises
</Card>
<Card title="Catálogo de Modelos" icon="list" href="https://app.langdb.ai/models">
Navegue por mais de 350 modelos de linguagem disponíveis
</Card>
<Card title="Recursos Enterprise" icon="building" href="https://docs.langdb.ai/enterprise">
Opções auto-hospedadas e capacidades empresariais
</Card>
</CardGroup>
## Próximos Passos
Este guia cobriu o básico da integração do LangDB AI Gateway com CrewAI. Para aprimorar ainda mais seus fluxos de trabalho de IA, explore:
- **Modelos Virtuais**: Crie configurações de modelo personalizadas com estratégias de roteamento
- **Guardrails e Segurança**: Implemente filtragem de conteúdo e controles de conformidade
- **Implantação em Produção**: Configure fallbacks, tentativas e balanceamento de carga
Para recursos mais avançados e casos de uso, visite a [Documentação LangDB](https://docs.langdb.ai) ou explore o [Catálogo de Modelos](https://app.langdb.ai/models) para descobrir todos os modelos disponíveis.

View File

@@ -25,6 +25,10 @@ A observabilidade é fundamental para entender como seus agentes CrewAI estão d
Replays de sessões, métricas e monitoramento para desenvolvimento e produção de agentes.
</Card>
<Card title="LangDB" icon="database" href="/pt-BR/observability/langdb">
Rastreamento ponta a ponta para fluxos de trabalho CrewAI com captura automática de interações de agentes.
</Card>
<Card title="OpenLIT" icon="magnifying-glass-chart" href="/pt-BR/observability/openlit">
Monitoramento nativo OpenTelemetry com rastreamento de custos e análises de desempenho.
</Card>

View File

@@ -11,7 +11,7 @@ dependencies = [
# Core Dependencies
"pydantic>=2.4.2",
"openai>=1.13.3",
"litellm==1.72.6",
"litellm==1.74.9",
"instructor>=1.3.3",
# Text Processing
"pdfplumber>=0.11.4",
@@ -39,6 +39,7 @@ dependencies = [
"tomli>=2.0.2",
"blinker>=1.9.0",
"json5>=0.10.0",
"portalocker==2.7.0",
]
[project.urls]
@@ -47,7 +48,7 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools~=0.49.0"]
tools = ["crewai-tools~=0.60.0"]
embeddings = [
"tiktoken~=0.8.0"
]

View File

@@ -54,7 +54,7 @@ def _track_install_async():
_track_install_async()
__version__ = "0.140.0"
__version__ = "0.157.0"
__all__ = [
"Agent",
"Crew",

View File

@@ -210,7 +210,6 @@ class Agent(BaseAgent):
sources=self.knowledge_sources,
embedder=self.embedder,
collection_name=self.role,
storage=self.knowledge_storage or None,
)
self.knowledge.add_sources()
except (TypeError, ValueError) as e:
@@ -223,11 +222,9 @@ class Agent(BaseAgent):
memory_attributes = [
"memory",
"memory_config",
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_user_memory",
"_external_memory",
]
@@ -317,11 +314,9 @@ class Agent(BaseAgent):
start_time = time.time()
contextual_memory = ContextualMemory(
self.crew.memory_config,
self.crew._short_term_memory,
self.crew._long_term_memory,
self.crew._entity_memory,
self.crew._user_memory,
self.crew._external_memory,
)
memory = contextual_memory.build_context_for_task(task, context)
@@ -341,7 +336,8 @@ class Agent(BaseAgent):
self.knowledge_config.model_dump() if self.knowledge_config else {}
)
if self.knowledge:
if self.knowledge or (self.crew and self.crew.knowledge):
crewai_event_bus.emit(
self,
event=KnowledgeRetrievalStartedEvent(
@@ -353,25 +349,28 @@ class Agent(BaseAgent):
task_prompt
)
if self.knowledge_search_query:
agent_knowledge_snippets = self.knowledge.query(
[self.knowledge_search_query], **knowledge_config
)
if agent_knowledge_snippets:
self.agent_knowledge_context = extract_knowledge_context(
agent_knowledge_snippets
)
if self.agent_knowledge_context:
task_prompt += self.agent_knowledge_context
if self.crew:
knowledge_snippets = self.crew.query_knowledge(
# Quering agent specific knowledge
if self.knowledge:
agent_knowledge_snippets = self.knowledge.query(
[self.knowledge_search_query], **knowledge_config
)
if knowledge_snippets:
self.crew_knowledge_context = extract_knowledge_context(
knowledge_snippets
if agent_knowledge_snippets:
self.agent_knowledge_context = extract_knowledge_context(
agent_knowledge_snippets
)
if self.crew_knowledge_context:
task_prompt += self.crew_knowledge_context
if self.agent_knowledge_context:
task_prompt += self.agent_knowledge_context
# Quering crew specific knowledge
knowledge_snippets = self.crew.query_knowledge(
[self.knowledge_search_query], **knowledge_config
)
if knowledge_snippets:
self.crew_knowledge_context = extract_knowledge_context(
knowledge_snippets
)
if self.crew_knowledge_context:
task_prompt += self.crew_knowledge_context
crewai_event_bus.emit(
self,

View File

@@ -120,11 +120,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
raise
except Exception as e:
handle_unknown_error(self._printer, e)
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
else:
raise e
raise
if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)

View File

@@ -1,8 +1,6 @@
ALGORITHMS = ["RS256"]
#TODO: The AUTH0 constants should be removed after WorkOS migration is completed
AUTH0_DOMAIN = "crewai.us.auth0.com"
AUTH0_CLIENT_ID = "DEVC5Fw6NlRoSzmDCcOhVq85EfLBjKa8"
AUTH0_AUDIENCE = "https://crewai.us.auth0.com/api/v2/"
WORKOS_DOMAIN = "login.crewai.com"
WORKOS_CLI_CONNECT_APP_ID = "client_01JYT06R59SP0NXYGD994NFXXX"
WORKOS_ENVIRONMENT_ID = "client_01JNJQWB4HG8T5980R5VHP057C"

View File

@@ -1,76 +1,92 @@
import time
import webbrowser
from typing import Any, Dict
from typing import Any, Dict, Optional
import requests
from rich.console import Console
from pydantic import BaseModel, Field
from .constants import (
AUTH0_AUDIENCE,
AUTH0_CLIENT_ID,
AUTH0_DOMAIN,
WORKOS_DOMAIN,
WORKOS_CLI_CONNECT_APP_ID,
WORKOS_ENVIRONMENT_ID,
)
from .utils import TokenManager, validate_jwt_token
from urllib.parse import quote
from crewai.cli.plus_api import PlusAPI
from crewai.cli.config import Settings
from crewai.cli.authentication.constants import (
AUTH0_AUDIENCE,
AUTH0_CLIENT_ID,
AUTH0_DOMAIN,
)
console = Console()
class Oauth2Settings(BaseModel):
provider: str = Field(description="OAuth2 provider used for authentication (e.g., workos, okta, auth0).")
client_id: str = Field(description="OAuth2 client ID issued by the provider, used during authentication requests.")
domain: str = Field(description="OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens.")
audience: Optional[str] = Field(description="OAuth2 audience value, typically used to identify the target API or resource.", default=None)
@classmethod
def from_settings(cls):
settings = Settings()
return cls(
provider=settings.oauth2_provider,
domain=settings.oauth2_domain,
client_id=settings.oauth2_client_id,
audience=settings.oauth2_audience,
)
class ProviderFactory:
@classmethod
def from_settings(cls, settings: Optional[Oauth2Settings] = None):
settings = settings or Oauth2Settings.from_settings()
import importlib
module = importlib.import_module(f"crewai.cli.authentication.providers.{settings.provider.lower()}")
provider = getattr(module, f"{settings.provider.capitalize()}Provider")
return provider(settings)
class AuthenticationCommand:
AUTH0_DEVICE_CODE_URL = f"https://{AUTH0_DOMAIN}/oauth/device/code"
AUTH0_TOKEN_URL = f"https://{AUTH0_DOMAIN}/oauth/token"
WORKOS_DEVICE_CODE_URL = f"https://{WORKOS_DOMAIN}/oauth2/device_authorization"
WORKOS_TOKEN_URL = f"https://{WORKOS_DOMAIN}/oauth2/token"
def __init__(self):
self.token_manager = TokenManager()
# TODO: WORKOS - This variable is temporary until migration to WorkOS is complete.
self.user_provider = "workos"
self.oauth2_provider = ProviderFactory.from_settings()
def login(self) -> None:
"""Sign up to CrewAI+"""
device_code_url = self.WORKOS_DEVICE_CODE_URL
token_url = self.WORKOS_TOKEN_URL
client_id = WORKOS_CLI_CONNECT_APP_ID
audience = None
console.print("Signing in to CrewAI Enterprise...\n", style="bold blue")
# TODO: WORKOS - Next line and conditional are temporary until migration to WorkOS is complete.
user_provider = self._determine_user_provider()
if user_provider == "auth0":
device_code_url = self.AUTH0_DEVICE_CODE_URL
token_url = self.AUTH0_TOKEN_URL
client_id = AUTH0_CLIENT_ID
audience = AUTH0_AUDIENCE
self.user_provider = "auth0"
settings = Oauth2Settings(
provider="auth0",
client_id=AUTH0_CLIENT_ID,
domain=AUTH0_DOMAIN,
audience=AUTH0_AUDIENCE
)
self.oauth2_provider = ProviderFactory.from_settings(settings)
# End of temporary code.
device_code_data = self._get_device_code(client_id, device_code_url, audience)
device_code_data = self._get_device_code()
self._display_auth_instructions(device_code_data)
return self._poll_for_token(device_code_data, client_id, token_url)
return self._poll_for_token(device_code_data)
def _get_device_code(
self, client_id: str, device_code_url: str, audience: str | None = None
self
) -> Dict[str, Any]:
"""Get the device code to authenticate the user."""
device_code_payload = {
"client_id": client_id,
"client_id": self.oauth2_provider.get_client_id(),
"scope": "openid",
"audience": audience,
"audience": self.oauth2_provider.get_audience(),
}
response = requests.post(
url=device_code_url, data=device_code_payload, timeout=20
url=self.oauth2_provider.get_authorize_url(), data=device_code_payload, timeout=20
)
response.raise_for_status()
return response.json()
@@ -82,21 +98,21 @@ class AuthenticationCommand:
webbrowser.open(device_code_data["verification_uri_complete"])
def _poll_for_token(
self, device_code_data: Dict[str, Any], client_id: str, token_poll_url: str
self, device_code_data: Dict[str, Any]
) -> None:
"""Polls the server for the token until it is received, or max attempts are reached."""
token_payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code_data["device_code"],
"client_id": client_id,
"client_id": self.oauth2_provider.get_client_id(),
}
console.print("\nWaiting for authentication... ", style="bold blue", end="")
attempts = 0
while True and attempts < 10:
response = requests.post(token_poll_url, data=token_payload, timeout=30)
response = requests.post(self.oauth2_provider.get_token_url(), data=token_payload, timeout=30)
token_data = response.json()
if response.status_code == 200:
@@ -128,19 +144,14 @@ class AuthenticationCommand:
"""Validates the JWT token and saves the token to the token manager."""
jwt_token = token_data["access_token"]
issuer = self.oauth2_provider.get_issuer()
jwt_token_data = {
"jwt_token": jwt_token,
"jwks_url": f"https://{WORKOS_DOMAIN}/oauth2/jwks",
"issuer": f"https://{WORKOS_DOMAIN}",
"audience": WORKOS_ENVIRONMENT_ID,
"jwks_url": self.oauth2_provider.get_jwks_url(),
"issuer": issuer,
"audience": self.oauth2_provider.get_audience(),
}
# TODO: WORKOS - The following conditional is temporary until migration to WorkOS is complete.
if self.user_provider == "auth0":
jwt_token_data["jwks_url"] = f"https://{AUTH0_DOMAIN}/.well-known/jwks.json"
jwt_token_data["issuer"] = f"https://{AUTH0_DOMAIN}/"
jwt_token_data["audience"] = AUTH0_AUDIENCE
decoded_token = validate_jwt_token(**jwt_token_data)
expires_at = decoded_token.get("exp", 0)

View File

@@ -0,0 +1,26 @@
from crewai.cli.authentication.providers.base_provider import BaseProvider
class Auth0Provider(BaseProvider):
def get_authorize_url(self) -> str:
return f"https://{self._get_domain()}/oauth/device/code"
def get_token_url(self) -> str:
return f"https://{self._get_domain()}/oauth/token"
def get_jwks_url(self) -> str:
return f"https://{self._get_domain()}/.well-known/jwks.json"
def get_issuer(self) -> str:
return f"https://{self._get_domain()}/"
def get_audience(self) -> str:
assert self.settings.audience is not None, "Audience is required"
return self.settings.audience
def get_client_id(self) -> str:
assert self.settings.client_id is not None, "Client ID is required"
return self.settings.client_id
def _get_domain(self) -> str:
assert self.settings.domain is not None, "Domain is required"
return self.settings.domain

View File

@@ -0,0 +1,30 @@
from abc import ABC, abstractmethod
from crewai.cli.authentication.main import Oauth2Settings
class BaseProvider(ABC):
def __init__(self, settings: Oauth2Settings):
self.settings = settings
@abstractmethod
def get_authorize_url(self) -> str:
...
@abstractmethod
def get_token_url(self) -> str:
...
@abstractmethod
def get_jwks_url(self) -> str:
...
@abstractmethod
def get_issuer(self) -> str:
...
@abstractmethod
def get_audience(self) -> str:
...
@abstractmethod
def get_client_id(self) -> str:
...

View File

@@ -0,0 +1,22 @@
from crewai.cli.authentication.providers.base_provider import BaseProvider
class OktaProvider(BaseProvider):
def get_authorize_url(self) -> str:
return f"https://{self.settings.domain}/oauth2/default/v1/device/authorize"
def get_token_url(self) -> str:
return f"https://{self.settings.domain}/oauth2/default/v1/token"
def get_jwks_url(self) -> str:
return f"https://{self.settings.domain}/oauth2/default/v1/keys"
def get_issuer(self) -> str:
return f"https://{self.settings.domain}/oauth2/default"
def get_audience(self) -> str:
assert self.settings.audience is not None
return self.settings.audience
def get_client_id(self) -> str:
assert self.settings.client_id is not None
return self.settings.client_id

View File

@@ -0,0 +1,25 @@
from crewai.cli.authentication.providers.base_provider import BaseProvider
class WorkosProvider(BaseProvider):
def get_authorize_url(self) -> str:
return f"https://{self._get_domain()}/oauth2/device_authorization"
def get_token_url(self) -> str:
return f"https://{self._get_domain()}/oauth2/token"
def get_jwks_url(self) -> str:
return f"https://{self._get_domain()}/oauth2/jwks"
def get_issuer(self) -> str:
return f"https://{self._get_domain()}"
def get_audience(self) -> str:
return self.settings.audience or ""
def get_client_id(self) -> str:
assert self.settings.client_id is not None, "Client ID is required"
return self.settings.client_id
def _get_domain(self) -> str:
assert self.settings.domain is not None, "Domain is required"
return self.settings.domain

View File

@@ -30,6 +30,9 @@ def validate_jwt_token(
jwk_client = PyJWKClient(jwks_url)
signing_key = jwk_client.get_signing_key_from_jwt(jwt_token)
_unverified_decoded_token = jwt.decode(
jwt_token, options={"verify_signature": False}
)
decoded_token = jwt.decode(
jwt_token,
signing_key.key,
@@ -49,9 +52,15 @@ def validate_jwt_token(
except jwt.ExpiredSignatureError:
raise Exception("Token has expired.")
except jwt.InvalidAudienceError:
raise Exception(f"Invalid token audience. Expected: '{audience}'")
actual_audience = _unverified_decoded_token.get("aud", "[no audience found]")
raise Exception(
f"Invalid token audience. Got: '{actual_audience}'. Expected: '{audience}'"
)
except jwt.InvalidIssuerError:
raise Exception(f"Invalid token issuer. Expected: '{issuer}'")
actual_issuer = _unverified_decoded_token.get("iss", "[no issuer found]")
raise Exception(
f"Invalid token issuer. Got: '{actual_issuer}'. Expected: '{issuer}'"
)
except jwt.MissingRequiredClaimError as e:
raise Exception(f"Token is missing required claims: {str(e)}")
except jwt.exceptions.PyJWKClientError as e:

View File

@@ -3,6 +3,7 @@ from typing import Optional
import click
from crewai.cli.config import Settings
from crewai.cli.settings.main import SettingsCommand
from crewai.cli.add_crew_to_flow import add_crew_to_flow
from crewai.cli.create_crew import create_crew
from crewai.cli.create_flow import create_flow
@@ -227,7 +228,7 @@ def update():
@crewai.command()
def login():
"""Sign Up/Login to CrewAI Enterprise."""
Settings().clear()
Settings().clear_user_settings()
AuthenticationCommand().login()
@@ -369,8 +370,8 @@ def org():
pass
@org.command()
def list():
@org.command("list")
def org_list():
"""List available organizations."""
org_command = OrganizationCommand()
org_command.list()
@@ -391,5 +392,34 @@ def current():
org_command.current()
@crewai.group()
def config():
"""CLI Configuration commands."""
pass
@config.command("list")
def config_list():
"""List all CLI configuration parameters."""
config_command = SettingsCommand()
config_command.list()
@config.command("set")
@click.argument("key")
@click.argument("value")
def config_set(key: str, value: str):
"""Set a CLI configuration parameter."""
config_command = SettingsCommand()
config_command.set(key, value)
@config.command("reset")
def config_reset():
"""Reset all CLI configuration parameters to default values."""
config_command = SettingsCommand()
config_command.reset_all_settings()
if __name__ == "__main__":
crewai()

View File

@@ -26,7 +26,7 @@ class PlusAPIMixin:
"Please sign up/login to CrewAI+ before using the CLI.",
style="bold red",
)
console.print("Run 'crewai signup' to sign up/login.", style="bold green")
console.print("Run 'crewai login' to sign up/login.", style="bold green")
raise SystemExit
def _validate_response(self, response: requests.Response) -> None:

View File

@@ -4,10 +4,60 @@ from typing import Optional
from pydantic import BaseModel, Field
from crewai.cli.constants import (
DEFAULT_CREWAI_ENTERPRISE_URL,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN,
)
DEFAULT_CONFIG_PATH = Path.home() / ".config" / "crewai" / "settings.json"
# Settings that are related to the user's account
USER_SETTINGS_KEYS = [
"tool_repository_username",
"tool_repository_password",
"org_name",
"org_uuid",
]
# Settings that are related to the CLI
CLI_SETTINGS_KEYS = [
"enterprise_base_url",
"oauth2_provider",
"oauth2_audience",
"oauth2_client_id",
"oauth2_domain",
]
# Default values for CLI settings
DEFAULT_CLI_SETTINGS = {
"enterprise_base_url": DEFAULT_CREWAI_ENTERPRISE_URL,
"oauth2_provider": CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER,
"oauth2_audience": CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE,
"oauth2_client_id": CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID,
"oauth2_domain": CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN,
}
# Readonly settings - cannot be set by the user
READONLY_SETTINGS_KEYS = [
"org_name",
"org_uuid",
]
# Hidden settings - not displayed by the 'list' command and cannot be set by the user
HIDDEN_SETTINGS_KEYS = [
"config_path",
"tool_repository_username",
"tool_repository_password",
]
class Settings(BaseModel):
enterprise_base_url: Optional[str] = Field(
default=DEFAULT_CLI_SETTINGS["enterprise_base_url"],
description="Base URL of the CrewAI Enterprise instance",
)
tool_repository_username: Optional[str] = Field(
None, description="Username for interacting with the Tool Repository"
)
@@ -20,7 +70,27 @@ class Settings(BaseModel):
org_uuid: Optional[str] = Field(
None, description="UUID of the currently active organization"
)
config_path: Path = Field(default=DEFAULT_CONFIG_PATH, exclude=True)
config_path: Path = Field(default=DEFAULT_CONFIG_PATH, frozen=True, exclude=True)
oauth2_provider: str = Field(
description="OAuth2 provider used for authentication (e.g., workos, okta, auth0).",
default=DEFAULT_CLI_SETTINGS["oauth2_provider"]
)
oauth2_audience: Optional[str] = Field(
description="OAuth2 audience value, typically used to identify the target API or resource.",
default=DEFAULT_CLI_SETTINGS["oauth2_audience"]
)
oauth2_client_id: str = Field(
default=DEFAULT_CLI_SETTINGS["oauth2_client_id"],
description="OAuth2 client ID issued by the provider, used during authentication requests.",
)
oauth2_domain: str = Field(
description="OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens.",
default=DEFAULT_CLI_SETTINGS["oauth2_domain"]
)
def __init__(self, config_path: Path = DEFAULT_CONFIG_PATH, **data):
"""Load Settings from config path"""
@@ -37,9 +107,16 @@ class Settings(BaseModel):
merged_data = {**file_data, **data}
super().__init__(config_path=config_path, **merged_data)
def clear(self) -> None:
"""Clear all settings"""
self.config_path.unlink(missing_ok=True)
def clear_user_settings(self) -> None:
"""Clear all user settings"""
self._reset_user_settings()
self.dump()
def reset(self) -> None:
"""Reset all settings to default values"""
self._reset_user_settings()
self._reset_cli_settings()
self.dump()
def dump(self) -> None:
"""Save current settings to settings.json"""
@@ -52,3 +129,13 @@ class Settings(BaseModel):
updated_data = {**existing_data, **self.model_dump(exclude_unset=True)}
with self.config_path.open("w") as f:
json.dump(updated_data, f, indent=4)
def _reset_user_settings(self) -> None:
"""Reset all user settings to default values"""
for key in USER_SETTINGS_KEYS:
setattr(self, key, None)
def _reset_cli_settings(self) -> None:
"""Reset all CLI settings to default values"""
for key in CLI_SETTINGS_KEYS:
setattr(self, key, DEFAULT_CLI_SETTINGS.get(key))

View File

@@ -1,3 +1,9 @@
DEFAULT_CREWAI_ENTERPRISE_URL = "https://app.crewai.com"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER = "workos"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE = "client_01JNJQWBJ4SPFN3SWJM5T7BDG8"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID = "client_01JYT06R59SP0NXYGD994NFXXX"
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN = "login.crewai.com"
ENV_VARS = {
"openai": [
{
@@ -320,5 +326,4 @@ DEFAULT_LLM_MODEL = "gpt-4o-mini"
JSON_URL = "https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json"
LITELLM_PARAMS = ["api_key", "api_base", "api_version"]

View File

@@ -1,4 +1,3 @@
from os import getenv
from typing import List, Optional
from urllib.parse import urljoin
@@ -6,6 +5,7 @@ import requests
from crewai.cli.config import Settings
from crewai.cli.version import get_crewai_version
from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
class PlusAPI:
@@ -17,6 +17,7 @@ class PlusAPI:
ORGANIZATIONS_RESOURCE = "/crewai_plus/api/v1/me/organizations"
CREWS_RESOURCE = "/crewai_plus/api/v1/crews"
AGENTS_RESOURCE = "/crewai_plus/api/v1/agents"
TRACING_RESOURCE = "/crewai_plus/api/v1/tracing"
def __init__(self, api_key: str) -> None:
self.api_key = api_key
@@ -29,7 +30,10 @@ class PlusAPI:
settings = Settings()
if settings.org_uuid:
self.headers["X-Crewai-Organization-Id"] = settings.org_uuid
self.base_url = getenv("CREWAI_BASE_URL", "https://app.crewai.com")
self.base_url = (
str(settings.enterprise_base_url) or DEFAULT_CREWAI_ENTERPRISE_URL
)
def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
url = urljoin(self.base_url, endpoint)
@@ -108,7 +112,28 @@ class PlusAPI:
def create_crew(self, payload) -> requests.Response:
return self._make_request("POST", self.CREWS_RESOURCE, json=payload)
def get_organizations(self) -> requests.Response:
return self._make_request("GET", self.ORGANIZATIONS_RESOURCE)
def send_trace_batch(self, payload) -> requests.Response:
return self._make_request("POST", self.TRACING_RESOURCE, json=payload)
def initialize_trace_batch(self, payload) -> requests.Response:
return self._make_request(
"POST", f"{self.TRACING_RESOURCE}/batches", json=payload
)
def send_trace_events(self, trace_batch_id: str, payload) -> requests.Response:
return self._make_request(
"POST",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
)
def finalize_trace_batch(self, trace_batch_id: str, payload) -> requests.Response:
return self._make_request(
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
)

View File

@@ -0,0 +1,67 @@
from rich.console import Console
from rich.table import Table
from crewai.cli.command import BaseCommand
from crewai.cli.config import Settings, READONLY_SETTINGS_KEYS, HIDDEN_SETTINGS_KEYS
from typing import Any
console = Console()
class SettingsCommand(BaseCommand):
"""A class to handle CLI configuration commands."""
def __init__(self, settings_kwargs: dict[str, Any] = {}):
super().__init__()
self.settings = Settings(**settings_kwargs)
def list(self) -> None:
"""List all CLI configuration parameters."""
table = Table(title="CrewAI CLI Configuration")
table.add_column("Setting", style="cyan", no_wrap=True)
table.add_column("Value", style="green")
table.add_column("Description", style="yellow")
# Add all settings to the table
for field_name, field_info in Settings.model_fields.items():
if field_name in HIDDEN_SETTINGS_KEYS:
# Do not display hidden settings
continue
current_value = getattr(self.settings, field_name)
description = field_info.description or "No description available"
display_value = (
str(current_value) if current_value is not None else "Not set"
)
table.add_row(field_name, display_value, description)
console.print(table)
def set(self, key: str, value: str) -> None:
"""Set a CLI configuration parameter."""
readonly_settings = READONLY_SETTINGS_KEYS + HIDDEN_SETTINGS_KEYS
if not hasattr(self.settings, key) or key in readonly_settings:
console.print(
f"Error: Unknown or readonly configuration key '{key}'",
style="bold red",
)
console.print("Available keys:", style="yellow")
for field_name in Settings.model_fields.keys():
if field_name not in readonly_settings:
console.print(f" - {field_name}", style="yellow")
raise SystemExit(1)
setattr(self.settings, key, value)
self.settings.dump()
console.print(f"Successfully set '{key}' to '{value}'", style="bold green")
def reset_all_settings(self) -> None:
"""Reset all CLI configuration parameters to default values."""
self.settings.reset()
console.print(
"Successfully reset all configuration parameters to default values. It is recommended to run [bold yellow]'crewai login'[/bold yellow] to re-authenticate.",
style="bold green",
)

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.140.0,<1.0.0"
"crewai[tools]>=0.157.0,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.140.0,<1.0.0",
"crewai[tools]>=0.157.0,<1.0.0",
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.140.0"
"crewai[tools]>=0.157.0"
]
[tool.crewai]

View File

@@ -1,3 +1,4 @@
import os
import asyncio
import json
import re
@@ -47,7 +48,6 @@ from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.external.external_memory import ExternalMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.user.user_memory import UserMemory
from crewai.process import Process
from crewai.security import Fingerprint, SecurityConfig
from crewai.task import Task
@@ -73,6 +73,11 @@ from crewai.utilities.events.crew_events import (
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.event_listener import EventListener
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -95,7 +100,6 @@ class Crew(FlowTrackable, BaseModel):
manager_llm: The language model that will run manager agent.
manager_agent: Custom agent that will be used as manager.
memory: Whether the crew should use memory to store memories of it's execution.
memory_config: Configuration for the memory to be used for the crew.
cache: Whether the crew should use a cache to store the results of the tools execution.
function_calling_llm: The language model that will run the tool calling for all the agents.
process: The process flow that the crew will follow (e.g., sequential, hierarchical).
@@ -121,7 +125,6 @@ class Crew(FlowTrackable, BaseModel):
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
_user_memory: Optional[InstanceOf[UserMemory]] = PrivateAttr()
_external_memory: Optional[InstanceOf[ExternalMemory]] = PrivateAttr()
_train: Optional[bool] = PrivateAttr(default=False)
_train_iteration: Optional[int] = PrivateAttr()
@@ -133,7 +136,7 @@ class Crew(FlowTrackable, BaseModel):
default_factory=TaskOutputStorageHandler
)
name: Optional[str] = Field(default=None)
name: Optional[str] = Field(default="crew")
cache: bool = Field(default=True)
tasks: List[Task] = Field(default_factory=list)
agents: List[BaseAgent] = Field(default_factory=list)
@@ -143,10 +146,6 @@ class Crew(FlowTrackable, BaseModel):
default=False,
description="Whether the crew should use memory to store memories of it's execution",
)
memory_config: Optional[Dict[str, Any]] = Field(
default=None,
description="Configuration for the memory to be used for the crew.",
)
short_term_memory: Optional[InstanceOf[ShortTermMemory]] = Field(
default=None,
description="An Instance of the ShortTermMemory to be used by the Crew",
@@ -159,10 +158,6 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="An Instance of the EntityMemory to be used by the Crew",
)
user_memory: Optional[InstanceOf[UserMemory]] = Field(
default=None,
description="An instance of the UserMemory to be used by the Crew to store/fetch memories of a specific user.",
)
external_memory: Optional[InstanceOf[ExternalMemory]] = Field(
default=None,
description="An Instance of the ExternalMemory to be used by the Crew",
@@ -249,6 +244,10 @@ class Crew(FlowTrackable, BaseModel):
default_factory=SecurityConfig,
description="Security configuration for the crew, including fingerprinting.",
)
token_usage: Optional[UsageMetrics] = Field(
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
@field_validator("id", mode="before")
@classmethod
@@ -280,6 +279,9 @@ class Crew(FlowTrackable, BaseModel):
self._cache_handler = CacheHandler()
event_listener = EventListener()
if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true":
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
event_listener.verbose = self.verbose
event_listener.formatter.verbose = self.verbose
self._logger = Logger(verbose=self.verbose)
@@ -291,20 +293,6 @@ class Crew(FlowTrackable, BaseModel):
return self
def _initialize_user_memory(self):
if (
self.memory_config
and "user_memory" in self.memory_config
and self.memory_config.get("provider") == "mem0"
): # Check for user_memory in config
user_memory_config = self.memory_config["user_memory"]
if isinstance(
user_memory_config, dict
): # Check if it's a configuration dict
self._user_memory = UserMemory(crew=self)
else:
raise TypeError("user_memory must be a configuration dictionary")
def _initialize_default_memories(self):
self._long_term_memory = self._long_term_memory or LongTermMemory()
self._short_term_memory = self._short_term_memory or ShortTermMemory(
@@ -327,12 +315,8 @@ class Crew(FlowTrackable, BaseModel):
self._short_term_memory = self.short_term_memory
self._entity_memory = self.entity_memory
# UserMemory is gonna to be deprecated in the future, but we have to initialize a default value for now
self._user_memory = None
if self.memory:
self._initialize_default_memories()
self._initialize_user_memory()
return self
@@ -575,7 +559,7 @@ class Crew(FlowTrackable, BaseModel):
crewai_event_bus.emit(
self,
CrewTrainStartedEvent(
crew_name=self.name or "crew",
crew_name=self.name,
n_iterations=n_iterations,
filename=filename,
inputs=inputs,
@@ -602,7 +586,7 @@ class Crew(FlowTrackable, BaseModel):
crewai_event_bus.emit(
self,
CrewTrainCompletedEvent(
crew_name=self.name or "crew",
crew_name=self.name,
n_iterations=n_iterations,
filename=filename,
),
@@ -610,7 +594,7 @@ class Crew(FlowTrackable, BaseModel):
except Exception as e:
crewai_event_bus.emit(
self,
CrewTrainFailedEvent(error=str(e), crew_name=self.name or "crew"),
CrewTrainFailedEvent(error=str(e), crew_name=self.name),
)
self._logger.log("error", f"Training failed: {e}", color="red")
CrewTrainingHandler(TRAINING_DATA_FILE).clear()
@@ -634,7 +618,7 @@ class Crew(FlowTrackable, BaseModel):
crewai_event_bus.emit(
self,
CrewKickoffStartedEvent(crew_name=self.name or "crew", inputs=inputs),
CrewKickoffStartedEvent(crew_name=self.name, inputs=inputs),
)
# Starts the crew to work on its assigned tasks.
@@ -683,7 +667,7 @@ class Crew(FlowTrackable, BaseModel):
except Exception as e:
crewai_event_bus.emit(
self,
CrewKickoffFailedEvent(error=str(e), crew_name=self.name or "crew"),
CrewKickoffFailedEvent(error=str(e), crew_name=self.name),
)
raise
finally:
@@ -1073,11 +1057,13 @@ class Crew(FlowTrackable, BaseModel):
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
self.token_usage = self.calculate_usage_metrics()
crewai_event_bus.emit(
self,
CrewKickoffCompletedEvent(
crew_name=self.name or "crew", output=final_task_output
crew_name=self.name,
output=final_task_output,
total_tokens=self.token_usage.total_tokens,
),
)
return CrewOutput(
@@ -1085,7 +1071,7 @@ class Crew(FlowTrackable, BaseModel):
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=task_outputs,
token_usage=token_usage,
token_usage=self.token_usage,
)
def _process_async_tasks(
@@ -1254,8 +1240,6 @@ class Crew(FlowTrackable, BaseModel):
copied_data["entity_memory"] = self.entity_memory.model_copy(deep=True)
if self.external_memory:
copied_data["external_memory"] = self.external_memory.model_copy(deep=True)
if self.user_memory:
copied_data["user_memory"] = self.user_memory.model_copy(deep=True)
copied_data.pop("agents", None)
copied_data.pop("tasks", None)
@@ -1324,13 +1308,14 @@ class Crew(FlowTrackable, BaseModel):
crewai_event_bus.emit(
self,
CrewTestStartedEvent(
crew_name=self.name or "crew",
crew_name=self.name,
n_iterations=n_iterations,
eval_llm=llm_instance,
inputs=inputs,
),
)
test_crew = self.copy()
evaluator = CrewEvaluator(test_crew, llm_instance)
for i in range(1, n_iterations + 1):
@@ -1342,13 +1327,13 @@ class Crew(FlowTrackable, BaseModel):
crewai_event_bus.emit(
self,
CrewTestCompletedEvent(
crew_name=self.name or "crew",
crew_name=self.name,
),
)
except Exception as e:
crewai_event_bus.emit(
self,
CrewTestFailedEvent(error=str(e), crew_name=self.name or "crew"),
CrewTestFailedEvent(error=str(e), crew_name=self.name),
)
raise

View File

@@ -0,0 +1,40 @@
from crewai.experimental.evaluation import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
AgentEvaluationResult,
SemanticQualityEvaluator,
GoalAlignmentEvaluator,
ReasoningEfficiencyEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
EvaluationTraceCallback,
create_evaluation_callbacks,
AgentEvaluator,
create_default_evaluator,
ExperimentRunner,
ExperimentResults,
ExperimentResult,
)
__all__ = [
"BaseEvaluator",
"EvaluationScore",
"MetricCategory",
"AgentEvaluationResult",
"SemanticQualityEvaluator",
"GoalAlignmentEvaluator",
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"EvaluationTraceCallback",
"create_evaluation_callbacks",
"AgentEvaluator",
"create_default_evaluator",
"ExperimentRunner",
"ExperimentResults",
"ExperimentResult"
]

View File

@@ -0,0 +1,51 @@
from crewai.experimental.evaluation.base_evaluator import (
BaseEvaluator,
EvaluationScore,
MetricCategory,
AgentEvaluationResult
)
from crewai.experimental.evaluation.metrics import (
SemanticQualityEvaluator,
GoalAlignmentEvaluator,
ReasoningEfficiencyEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.experimental.evaluation.evaluation_listener import (
EvaluationTraceCallback,
create_evaluation_callbacks
)
from crewai.experimental.evaluation.agent_evaluator import (
AgentEvaluator,
create_default_evaluator
)
from crewai.experimental.evaluation.experiment import (
ExperimentRunner,
ExperimentResults,
ExperimentResult
)
__all__ = [
"BaseEvaluator",
"EvaluationScore",
"MetricCategory",
"AgentEvaluationResult",
"SemanticQualityEvaluator",
"GoalAlignmentEvaluator",
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"EvaluationTraceCallback",
"create_evaluation_callbacks",
"AgentEvaluator",
"create_default_evaluator",
"ExperimentRunner",
"ExperimentResults",
"ExperimentResult"
]

View File

@@ -0,0 +1,245 @@
import threading
from typing import Any
from crewai.experimental.evaluation.base_evaluator import AgentEvaluationResult, AggregationStrategy
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.evaluation_display import EvaluationDisplayFormatter
from crewai.utilities.events.agent_events import AgentEvaluationStartedEvent, AgentEvaluationCompletedEvent, AgentEvaluationFailedEvent
from crewai.experimental.evaluation import BaseEvaluator, create_evaluation_callbacks
from collections.abc import Sequence
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.events.task_events import TaskCompletedEvent
from crewai.utilities.events.agent_events import LiteAgentExecutionCompletedEvent
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, EvaluationScore, MetricCategory
class ExecutionState:
def __init__(self):
self.traces = {}
self.current_agent_id: str | None = None
self.current_task_id: str | None = None
self.iteration = 1
self.iterations_results = {}
self.agent_evaluators = {}
class AgentEvaluator:
def __init__(
self,
agents: list[Agent],
evaluators: Sequence[BaseEvaluator] | None = None,
):
self.agents: list[Agent] = agents
self.evaluators: Sequence[BaseEvaluator] | None = evaluators
self.callback = create_evaluation_callbacks()
self.console_formatter = ConsoleFormatter()
self.display_formatter = EvaluationDisplayFormatter()
self._thread_local: threading.local = threading.local()
for agent in self.agents:
self._execution_state.agent_evaluators[str(agent.id)] = self.evaluators
self._subscribe_to_events()
@property
def _execution_state(self) -> ExecutionState:
if not hasattr(self._thread_local, 'execution_state'):
self._thread_local.execution_state = ExecutionState()
return self._thread_local.execution_state
def _subscribe_to_events(self) -> None:
from typing import cast
crewai_event_bus.register_handler(TaskCompletedEvent, cast(Any, self._handle_task_completed))
crewai_event_bus.register_handler(LiteAgentExecutionCompletedEvent, cast(Any, self._handle_lite_agent_completed))
def _handle_task_completed(self, source: Any, event: TaskCompletedEvent) -> None:
assert event.task is not None
agent = event.task.agent
if agent and str(getattr(agent, 'id', 'unknown')) in self._execution_state.agent_evaluators:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=str(event.task.id))
state = ExecutionState()
state.current_agent_id = str(agent.id)
state.current_task_id = str(event.task.id)
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace:
return
result = self.evaluate(
agent=agent,
task=event.task,
execution_trace=trace,
final_output=event.output,
state=state
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
if agent.role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent.role] = []
self._execution_state.iterations_results[current_iteration][agent.role].append(result)
def _handle_lite_agent_completed(self, source: object, event: LiteAgentExecutionCompletedEvent) -> None:
agent_info = event.agent_info
agent_id = str(agent_info["id"])
if agent_id in self._execution_state.agent_evaluators:
state = ExecutionState()
state.current_agent_id = agent_id
state.current_task_id = "lite_task"
target_agent = None
for agent in self.agents:
if str(agent.id) == agent_id:
target_agent = agent
break
if not target_agent:
return
assert state.current_agent_id is not None and state.current_task_id is not None
trace = self.callback.get_trace(state.current_agent_id, state.current_task_id)
if not trace:
return
result = self.evaluate(
agent=target_agent,
execution_trace=trace,
final_output=event.output,
state=state
)
current_iteration = self._execution_state.iteration
if current_iteration not in self._execution_state.iterations_results:
self._execution_state.iterations_results[current_iteration] = {}
agent_role = target_agent.role
if agent_role not in self._execution_state.iterations_results[current_iteration]:
self._execution_state.iterations_results[current_iteration][agent_role] = []
self._execution_state.iterations_results[current_iteration][agent_role].append(result)
def set_iteration(self, iteration: int) -> None:
self._execution_state.iteration = iteration
def reset_iterations_results(self) -> None:
self._execution_state.iterations_results = {}
def get_evaluation_results(self) -> dict[str, list[AgentEvaluationResult]]:
if self._execution_state.iterations_results and self._execution_state.iteration in self._execution_state.iterations_results:
return self._execution_state.iterations_results[self._execution_state.iteration]
return {}
def display_results_with_iterations(self) -> None:
self.display_formatter.display_summary_results(self._execution_state.iterations_results)
def get_agent_evaluation(self, strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE, include_evaluation_feedback: bool = True) -> dict[str, AgentAggregatedEvaluationResult]:
agent_results = {}
with crewai_event_bus.scoped_handlers():
task_results = self.get_evaluation_results()
for agent_role, results in task_results.items():
if not results:
continue
agent_id = results[0].agent_id
aggregated_result = self.display_formatter._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=results,
strategy=strategy
)
agent_results[agent_role] = aggregated_result
if self._execution_state.iterations_results and self._execution_state.iteration == max(self._execution_state.iterations_results.keys(), default=0):
self.display_results_with_iterations()
if include_evaluation_feedback:
self.display_evaluation_with_feedback()
return agent_results
def display_evaluation_with_feedback(self) -> None:
self.display_formatter.display_evaluation_with_feedback(self._execution_state.iterations_results)
def evaluate(
self,
agent: Agent,
execution_trace: dict[str, Any],
final_output: Any,
state: ExecutionState,
task: Task | None = None,
) -> AgentEvaluationResult:
result = AgentEvaluationResult(
agent_id=state.current_agent_id or str(agent.id),
task_id=state.current_task_id or (str(task.id) if task else "unknown_task")
)
assert self.evaluators is not None
task_id = str(task.id) if task else None
for evaluator in self.evaluators:
try:
self.emit_evaluation_started_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id)
score = evaluator.evaluate(
agent=agent,
task=task,
execution_trace=execution_trace,
final_output=final_output
)
result.metrics[evaluator.metric_category] = score
self.emit_evaluation_completed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, metric_category=evaluator.metric_category, score=score)
except Exception as e:
self.emit_evaluation_failed_event(agent_role=agent.role, agent_id=str(agent.id), task_id=task_id, error=str(e))
self.console_formatter.print(f"Error in {evaluator.metric_category.value} evaluator: {str(e)}")
return result
def emit_evaluation_started_event(self, agent_role: str, agent_id: str, task_id: str | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationStartedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration)
)
def emit_evaluation_completed_event(self, agent_role: str, agent_id: str, task_id: str | None = None, metric_category: MetricCategory | None = None, score: EvaluationScore | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationCompletedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, metric_category=metric_category, score=score)
)
def emit_evaluation_failed_event(self, agent_role: str, agent_id: str, error: str, task_id: str | None = None):
crewai_event_bus.emit(
self,
AgentEvaluationFailedEvent(agent_role=agent_role, agent_id=agent_id, task_id=task_id, iteration=self._execution_state.iteration, error=error)
)
def create_default_evaluator(agents: list[Agent], llm: None = None):
from crewai.experimental.evaluation import (
GoalAlignmentEvaluator,
SemanticQualityEvaluator,
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator,
ReasoningEfficiencyEvaluator
)
evaluators = [
GoalAlignmentEvaluator(llm=llm),
SemanticQualityEvaluator(llm=llm),
ToolSelectionEvaluator(llm=llm),
ParameterExtractionEvaluator(llm=llm),
ToolInvocationEvaluator(llm=llm),
ReasoningEfficiencyEvaluator(llm=llm),
]
return AgentEvaluator(evaluators=evaluators, agents=agents)

View File

@@ -0,0 +1,125 @@
import abc
import enum
from enum import Enum
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.task import Task
from crewai.llm import BaseLLM
from crewai.utilities.llm_utils import create_llm
class MetricCategory(enum.Enum):
GOAL_ALIGNMENT = "goal_alignment"
SEMANTIC_QUALITY = "semantic_quality"
REASONING_EFFICIENCY = "reasoning_efficiency"
TOOL_SELECTION = "tool_selection"
PARAMETER_EXTRACTION = "parameter_extraction"
TOOL_INVOCATION = "tool_invocation"
def title(self):
return self.value.replace('_', ' ').title()
class EvaluationScore(BaseModel):
score: float | None = Field(
default=5.0,
description="Numeric score from 0-10 where 0 is worst and 10 is best, None if not applicable",
ge=0.0,
le=10.0
)
feedback: str = Field(
default="",
description="Detailed feedback explaining the evaluation score"
)
raw_response: str | None = Field(
default=None,
description="Raw response from the evaluator (e.g., LLM)"
)
def __str__(self) -> str:
if self.score is None:
return f"Score: N/A - {self.feedback}"
return f"Score: {self.score:.1f}/10 - {self.feedback}"
class BaseEvaluator(abc.ABC):
def __init__(self, llm: BaseLLM | None = None):
self.llm: BaseLLM | None = create_llm(llm)
@property
@abc.abstractmethod
def metric_category(self) -> MetricCategory:
pass
@abc.abstractmethod
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
pass
class AgentEvaluationResult(BaseModel):
agent_id: str = Field(description="ID of the evaluated agent")
task_id: str = Field(description="ID of the task that was executed")
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Evaluation scores for each metric category"
)
class AggregationStrategy(Enum):
SIMPLE_AVERAGE = "simple_average" # Equal weight to all tasks
WEIGHTED_BY_COMPLEXITY = "weighted_by_complexity" # Weight by task complexity
BEST_PERFORMANCE = "best_performance" # Use best scores across tasks
WORST_PERFORMANCE = "worst_performance" # Use worst scores across tasks
class AgentAggregatedEvaluationResult(BaseModel):
agent_id: str = Field(
default="",
description="ID of the agent"
)
agent_role: str = Field(
default="",
description="Role of the agent"
)
task_count: int = Field(
default=0,
description="Number of tasks included in this aggregation"
)
aggregation_strategy: AggregationStrategy = Field(
default=AggregationStrategy.SIMPLE_AVERAGE,
description="Strategy used for aggregation"
)
metrics: Dict[MetricCategory, EvaluationScore] = Field(
default_factory=dict,
description="Aggregated metrics across all tasks"
)
task_results: List[str] = Field(
default_factory=list,
description="IDs of tasks included in this aggregation"
)
overall_score: Optional[float] = Field(
default=None,
description="Overall score for this agent"
)
def __str__(self) -> str:
result = f"Agent Evaluation: {self.agent_role}\n"
result += f"Strategy: {self.aggregation_strategy.value}\n"
result += f"Tasks evaluated: {self.task_count}\n"
for category, score in self.metrics.items():
result += f"\n\n- {category.value.upper()}: {score.score}/10\n"
if score.feedback:
detailed_feedback = "\n ".join(score.feedback.split('\n'))
result += f" {detailed_feedback}\n"
return result

View File

@@ -0,0 +1,333 @@
from collections import defaultdict
from typing import Dict, Any, List
from rich.table import Table
from rich.box import HEAVY_EDGE, ROUNDED
from collections.abc import Sequence
from crewai.experimental.evaluation.base_evaluator import AgentAggregatedEvaluationResult, AggregationStrategy, AgentEvaluationResult, MetricCategory
from crewai.experimental.evaluation import EvaluationScore
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from crewai.utilities.llm_utils import create_llm
class EvaluationDisplayFormatter:
def __init__(self):
self.console_formatter = ConsoleFormatter()
def display_evaluation_with_feedback(self, iterations_results: Dict[int, Dict[str, List[Any]]]):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
return
all_agent_roles: set[str] = set()
for iter_results in iterations_results.values():
all_agent_roles.update(iter_results.keys())
for agent_role in sorted(all_agent_roles):
self.console_formatter.print(f"\n[bold cyan]Agent: {agent_role}[/bold cyan]")
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
continue
agent_results = results[agent_role]
agent_id = agent_results[0].agent_id
aggregated_result = self._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
)
self.console_formatter.print(f"\n[bold]Iteration {iter_num}[/bold]")
table = Table(box=ROUNDED)
table.add_column("Metric", style="cyan")
table.add_column("Score (1-10)", justify="center")
table.add_column("Feedback", style="green")
if aggregated_result.metrics:
for metric, evaluation_score in aggregated_result.metrics.items():
score = evaluation_score.score
if isinstance(score, (int, float)):
if score >= 8.0:
score_text = f"[green]{score:.1f}[/green]"
elif score >= 6.0:
score_text = f"[cyan]{score:.1f}[/cyan]"
elif score >= 4.0:
score_text = f"[yellow]{score:.1f}[/yellow]"
else:
score_text = f"[red]{score:.1f}[/red]"
else:
score_text = "[dim]N/A[/dim]"
table.add_section()
table.add_row(
metric.title(),
score_text,
evaluation_score.feedback or ""
)
if aggregated_result.overall_score is not None:
overall_score = aggregated_result.overall_score
if overall_score >= 8.0:
overall_color = "green"
elif overall_score >= 6.0:
overall_color = "cyan"
elif overall_score >= 4.0:
overall_color = "yellow"
else:
overall_color = "red"
table.add_section()
table.add_row(
"Overall Score",
f"[{overall_color}]{overall_score:.1f}[/]",
"Overall agent evaluation score"
)
self.console_formatter.print(table)
def display_summary_results(self, iterations_results: Dict[int, Dict[str, List[AgentAggregatedEvaluationResult]]]):
if not iterations_results:
self.console_formatter.print("[yellow]No evaluation results to display[/yellow]")
return
self.console_formatter.print("\n")
table = Table(title="Agent Performance Scores \n (1-10 Higher is better)", box=HEAVY_EDGE)
table.add_column("Agent/Metric", style="cyan")
for iter_num in sorted(iterations_results.keys()):
run_label = f"Run {iter_num}"
table.add_column(run_label, justify="center")
table.add_column("Avg. Total", justify="center")
all_agent_roles: set[str] = set()
for results in iterations_results.values():
all_agent_roles.update(results.keys())
for agent_role in sorted(all_agent_roles):
agent_scores_by_iteration = {}
agent_metrics_by_iteration = {}
for iter_num, results in sorted(iterations_results.items()):
if agent_role not in results or not results[agent_role]:
continue
agent_results = results[agent_role]
agent_id = agent_results[0].agent_id
aggregated_result = self._aggregate_agent_results(
agent_id=agent_id,
agent_role=agent_role,
results=agent_results,
strategy=AggregationStrategy.SIMPLE_AVERAGE
)
valid_scores = [score.score for score in aggregated_result.metrics.values()
if score.score is not None]
if valid_scores:
avg_score = sum(valid_scores) / len(valid_scores)
agent_scores_by_iteration[iter_num] = avg_score
agent_metrics_by_iteration[iter_num] = aggregated_result.metrics
if not agent_scores_by_iteration:
continue
avg_across_iterations = sum(agent_scores_by_iteration.values()) / len(agent_scores_by_iteration)
row = [f"[bold]{agent_role}[/bold]"]
for iter_num in sorted(iterations_results.keys()):
if iter_num in agent_scores_by_iteration:
score = agent_scores_by_iteration[iter_num]
if score >= 8.0:
color = "green"
elif score >= 6.0:
color = "cyan"
elif score >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[bold {color}]{score:.1f}[/]")
else:
row.append("-")
if avg_across_iterations >= 8.0:
color = "green"
elif avg_across_iterations >= 6.0:
color = "cyan"
elif avg_across_iterations >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[bold {color}]{avg_across_iterations:.1f}[/]")
table.add_row(*row)
all_metrics: set[Any] = set()
for metrics in agent_metrics_by_iteration.values():
all_metrics.update(metrics.keys())
for metric in sorted(all_metrics, key=lambda x: x.value):
metric_scores = []
row = [f" - {metric.title()}"]
for iter_num in sorted(iterations_results.keys()):
if (iter_num in agent_metrics_by_iteration and
metric in agent_metrics_by_iteration[iter_num]):
metric_score = agent_metrics_by_iteration[iter_num][metric].score
if metric_score is not None:
metric_scores.append(metric_score)
if metric_score >= 8.0:
color = "green"
elif metric_score >= 6.0:
color = "cyan"
elif metric_score >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[{color}]{metric_score:.1f}[/]")
else:
row.append("[dim]N/A[/dim]")
else:
row.append("-")
if metric_scores:
avg = sum(metric_scores) / len(metric_scores)
if avg >= 8.0:
color = "green"
elif avg >= 6.0:
color = "cyan"
elif avg >= 4.0:
color = "yellow"
else:
color = "red"
row.append(f"[{color}]{avg:.1f}[/]")
else:
row.append("-")
table.add_row(*row)
table.add_row(*[""] * (len(sorted(iterations_results.keys())) + 2))
self.console_formatter.print(table)
self.console_formatter.print("\n")
def _aggregate_agent_results(
self,
agent_id: str,
agent_role: str,
results: Sequence[AgentEvaluationResult],
strategy: AggregationStrategy = AggregationStrategy.SIMPLE_AVERAGE,
) -> AgentAggregatedEvaluationResult:
metrics_by_category: dict[MetricCategory, list[EvaluationScore]] = defaultdict(list)
for result in results:
for metric_name, evaluation_score in result.metrics.items():
metrics_by_category[metric_name].append(evaluation_score)
aggregated_metrics: dict[MetricCategory, EvaluationScore] = {}
for category, scores in metrics_by_category.items():
valid_scores = [s.score for s in scores if s.score is not None]
avg_score = sum(valid_scores) / len(valid_scores) if valid_scores else None
feedbacks = [s.feedback for s in scores if s.feedback]
feedback_summary = None
if feedbacks:
if len(feedbacks) > 1:
feedback_summary = self._summarize_feedbacks(
agent_role=agent_role,
metric=category.title(),
feedbacks=feedbacks,
scores=[s.score for s in scores],
strategy=strategy
)
else:
feedback_summary = feedbacks[0]
aggregated_metrics[category] = EvaluationScore(
score=avg_score,
feedback=feedback_summary
)
overall_score = None
if aggregated_metrics:
valid_scores = [m.score for m in aggregated_metrics.values() if m.score is not None]
if valid_scores:
overall_score = sum(valid_scores) / len(valid_scores)
return AgentAggregatedEvaluationResult(
agent_id=agent_id,
agent_role=agent_role,
metrics=aggregated_metrics,
overall_score=overall_score,
task_count=len(results),
aggregation_strategy=strategy
)
def _summarize_feedbacks(
self,
agent_role: str,
metric: str,
feedbacks: List[str],
scores: List[float | None],
strategy: AggregationStrategy
) -> str:
if len(feedbacks) <= 2 and all(len(fb) < 200 for fb in feedbacks):
return "\n\n".join([f"Feedback {i+1}: {fb}" for i, fb in enumerate(feedbacks)])
try:
llm = create_llm()
formatted_feedbacks = []
for i, (feedback, score) in enumerate(zip(feedbacks, scores)):
if len(feedback) > 500:
feedback = feedback[:500] + "..."
score_text = f"{score:.1f}" if score is not None else "N/A"
formatted_feedbacks.append(f"Feedback #{i+1} (Score: {score_text}):\n{feedback}")
all_feedbacks = "\n\n" + "\n\n---\n\n".join(formatted_feedbacks)
strategy_guidance = ""
if strategy == AggregationStrategy.BEST_PERFORMANCE:
strategy_guidance = "Focus on the highest-scoring aspects and strengths demonstrated."
elif strategy == AggregationStrategy.WORST_PERFORMANCE:
strategy_guidance = "Focus on areas that need improvement and common issues across tasks."
else:
strategy_guidance = "Provide a balanced analysis of strengths and weaknesses across all tasks."
prompt = [
{"role": "system", "content": f"""You are an expert evaluator creating a comprehensive summary of agent performance feedback.
Your job is to synthesize multiple feedback points about the same metric across different tasks.
Create a concise, insightful summary that captures the key patterns and themes from all feedback.
{strategy_guidance}
Your summary should be:
1. Specific and concrete (not vague or general)
2. Focused on actionable insights
3. Highlighting patterns across tasks
4. 150-250 words in length
The summary should be directly usable as final feedback for the agent's performance on this metric."""},
{"role": "user", "content": f"""I need a synthesized summary of the following feedback for:
Agent Role: {agent_role}
Metric: {metric.title()}
{all_feedbacks}
"""}
]
assert llm is not None
response = llm.call(prompt)
return response
except Exception:
return "Synthesized from multiple tasks: " + "\n\n".join([f"- {fb[:500]}..." for fb in feedbacks])

View File

@@ -0,0 +1,234 @@
from datetime import datetime
from typing import Any, Dict, Optional
from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus
from crewai.utilities.events.agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
LiteAgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent
)
from crewai.utilities.events.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolExecutionErrorEvent,
ToolSelectionErrorEvent,
ToolValidateInputErrorEvent
)
from crewai.utilities.events.llm_events import (
LLMCallStartedEvent,
LLMCallCompletedEvent
)
class EvaluationTraceCallback(BaseEventListener):
"""Event listener for collecting execution traces for evaluation.
This listener attaches to the event bus to collect detailed information
about the execution process, including agent steps, tool uses, knowledge
retrievals, and final output - all for use in agent evaluation.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not hasattr(self, "_initialized") or not self._initialized:
super().__init__()
self.traces = {}
self.current_agent_id = None
self.current_task_id = None
self._initialized = True
def setup_listeners(self, event_bus: CrewAIEventsBus):
@event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event: AgentExecutionStartedEvent):
self.on_agent_start(event.agent, event.task)
@event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_started(source, event: LiteAgentExecutionStartedEvent):
self.on_lite_agent_start(event.agent_info)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event: AgentExecutionCompletedEvent):
self.on_agent_finish(event.agent, event.task, event.output)
@event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_completed(source, event: LiteAgentExecutionCompletedEvent):
self.on_lite_agent_finish(event.output)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_completed(source, event: ToolUsageFinishedEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.output, success=True)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="usage_error")
@event_bus.on(ToolExecutionErrorEvent)
def on_tool_execution_error(source, event: ToolExecutionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="execution_error")
@event_bus.on(ToolSelectionErrorEvent)
def on_tool_selection_error(source, event: ToolSelectionErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="selection_error")
@event_bus.on(ToolValidateInputErrorEvent)
def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent):
self.on_tool_use(event.tool_name, event.tool_args, event.error,
success=False, error_type="validation_error")
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
self.on_llm_call_start(event.messages, event.tools)
@event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.on_llm_call_end(event.messages, event.response)
def on_lite_agent_start(self, agent_info: dict[str, Any]):
self.current_agent_id = agent_info['id']
self.current_task_id = "lite_task"
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
self._init_trace(
trace_key=trace_key,
agent_id=self.current_agent_id,
task_id=self.current_task_id,
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
)
def _init_trace(self, trace_key: str, **kwargs: Any):
self.traces[trace_key] = kwargs
def on_agent_start(self, agent: Agent, task: Task):
self.current_agent_id = agent.id
self.current_task_id = task.id
trace_key = f"{agent.id}_{task.id}"
self._init_trace(
trace_key=trace_key,
agent_id=agent.id,
task_id=task.id,
tool_uses=[],
llm_calls=[],
start_time=datetime.now(),
final_output=None
)
def on_agent_finish(self, agent: Agent, task: Task, output: Any):
trace_key = f"{agent.id}_{task.id}"
if trace_key in self.traces:
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self._reset_current()
def _reset_current(self):
self.current_agent_id = None
self.current_task_id = None
def on_lite_agent_finish(self, output: Any):
trace_key = f"{self.current_agent_id}_lite_task"
if trace_key in self.traces:
self.traces[trace_key]["final_output"] = output
self.traces[trace_key]["end_time"] = datetime.now()
self._reset_current()
def on_tool_use(self, tool_name: str, tool_args: dict[str, Any] | str, result: Any,
success: bool = True, error_type: str | None = None):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key in self.traces:
tool_use = {
"tool": tool_name,
"args": tool_args,
"result": result,
"success": success,
"timestamp": datetime.now()
}
# Add error information if applicable
if not success and error_type:
tool_use["error"] = True
tool_use["error_type"] = error_type
self.traces[trace_key]["tool_uses"].append(tool_use)
def on_llm_call_start(self, messages: str | Sequence[dict[str, Any]] | None, tools: Sequence[dict[str, Any]] | None = None):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key not in self.traces:
return
self.current_llm_call = {
"messages": messages,
"tools": tools,
"start_time": datetime.now(),
"response": None,
"end_time": None
}
def on_llm_call_end(self, messages: str | list[dict[str, Any]] | None, response: Any):
if not self.current_agent_id or not self.current_task_id:
return
trace_key = f"{self.current_agent_id}_{self.current_task_id}"
if trace_key not in self.traces:
return
total_tokens = 0
if hasattr(response, "usage") and hasattr(response.usage, "total_tokens"):
total_tokens = response.usage.total_tokens
current_time = datetime.now()
start_time = None
if hasattr(self, "current_llm_call") and self.current_llm_call:
start_time = self.current_llm_call.get("start_time")
if not start_time:
start_time = current_time
llm_call = {
"messages": messages,
"response": response,
"start_time": start_time,
"end_time": current_time,
"total_tokens": total_tokens
}
self.traces[trace_key]["llm_calls"].append(llm_call)
if hasattr(self, "current_llm_call"):
self.current_llm_call = {}
def get_trace(self, agent_id: str, task_id: str) -> Optional[Dict[str, Any]]:
trace_key = f"{agent_id}_{task_id}"
return self.traces.get(trace_key)
def create_evaluation_callbacks() -> EvaluationTraceCallback:
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
callback = EvaluationTraceCallback()
callback.setup_listeners(crewai_event_bus)
return callback

View File

@@ -0,0 +1,8 @@
from crewai.experimental.evaluation.experiment.runner import ExperimentRunner
from crewai.experimental.evaluation.experiment.result import ExperimentResults, ExperimentResult
__all__ = [
"ExperimentRunner",
"ExperimentResults",
"ExperimentResult"
]

View File

@@ -0,0 +1,122 @@
import json
import os
from datetime import datetime, timezone
from typing import Any
from pydantic import BaseModel
class ExperimentResult(BaseModel):
identifier: str
inputs: dict[str, Any]
score: int | dict[str, int | float]
expected_score: int | dict[str, int | float]
passed: bool
agent_evaluations: dict[str, Any] | None = None
class ExperimentResults:
def __init__(self, results: list[ExperimentResult], metadata: dict[str, Any] | None = None):
self.results = results
self.metadata = metadata or {}
self.timestamp = datetime.now(timezone.utc)
from crewai.experimental.evaluation.experiment.result_display import ExperimentResultsDisplay
self.display = ExperimentResultsDisplay()
def to_json(self, filepath: str | None = None) -> dict[str, Any]:
data = {
"timestamp": self.timestamp.isoformat(),
"metadata": self.metadata,
"results": [r.model_dump(exclude={"agent_evaluations"}) for r in self.results]
}
if filepath:
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
self.display.console.print(f"[green]Results saved to {filepath}[/green]")
return data
def compare_with_baseline(self, baseline_filepath: str, save_current: bool = True, print_summary: bool = False) -> dict[str, Any]:
baseline_runs = []
if os.path.exists(baseline_filepath) and os.path.getsize(baseline_filepath) > 0:
try:
with open(baseline_filepath, 'r') as f:
baseline_data = json.load(f)
if isinstance(baseline_data, dict) and "timestamp" in baseline_data:
baseline_runs = [baseline_data]
elif isinstance(baseline_data, list):
baseline_runs = baseline_data
except (json.JSONDecodeError, FileNotFoundError) as e:
self.display.console.print(f"[yellow]Warning: Could not load baseline file: {str(e)}[/yellow]")
if not baseline_runs:
if save_current:
current_data = self.to_json()
with open(baseline_filepath, 'w') as f:
json.dump([current_data], f, indent=2)
self.display.console.print(f"[green]Saved current results as new baseline to {baseline_filepath}[/green]")
return {"is_baseline": True, "changes": {}}
baseline_runs.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
latest_run = baseline_runs[0]
comparison = self._compare_with_run(latest_run)
if print_summary:
self.display.comparison_summary(comparison, latest_run["timestamp"])
if save_current:
current_data = self.to_json()
baseline_runs.append(current_data)
with open(baseline_filepath, 'w') as f:
json.dump(baseline_runs, f, indent=2)
self.display.console.print(f"[green]Added current results to baseline file {baseline_filepath}[/green]")
return comparison
def _compare_with_run(self, baseline_run: dict[str, Any]) -> dict[str, Any]:
baseline_results = baseline_run.get("results", [])
baseline_lookup = {}
for result in baseline_results:
test_identifier = result.get("identifier")
if test_identifier:
baseline_lookup[test_identifier] = result
improved = []
regressed = []
unchanged = []
new_tests = []
for result in self.results:
test_identifier = result.identifier
if not test_identifier or test_identifier not in baseline_lookup:
new_tests.append(test_identifier)
continue
baseline_result = baseline_lookup[test_identifier]
baseline_passed = baseline_result.get("passed", False)
if result.passed and not baseline_passed:
improved.append(test_identifier)
elif not result.passed and baseline_passed:
regressed.append(test_identifier)
else:
unchanged.append(test_identifier)
missing_tests = []
current_test_identifiers = {result.identifier for result in self.results}
for result in baseline_results:
test_identifier = result.get("identifier")
if test_identifier and test_identifier not in current_test_identifiers:
missing_tests.append(test_identifier)
return {
"improved": improved,
"regressed": regressed,
"unchanged": unchanged,
"new_tests": new_tests,
"missing_tests": missing_tests,
"total_compared": len(improved) + len(regressed) + len(unchanged),
"baseline_timestamp": baseline_run.get("timestamp", "unknown")
}

View File

@@ -0,0 +1,70 @@
from typing import Dict, Any
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from crewai.experimental.evaluation.experiment.result import ExperimentResults
class ExperimentResultsDisplay:
def __init__(self):
self.console = Console()
def summary(self, experiment_results: ExperimentResults):
total = len(experiment_results.results)
passed = sum(1 for r in experiment_results.results if r.passed)
table = Table(title="Experiment Summary")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Test Cases", str(total))
table.add_row("Passed", str(passed))
table.add_row("Failed", str(total - passed))
table.add_row("Success Rate", f"{(passed / total * 100):.1f}%" if total > 0 else "N/A")
self.console.print(table)
def comparison_summary(self, comparison: Dict[str, Any], baseline_timestamp: str):
self.console.print(Panel(f"[bold]Comparison with baseline run from {baseline_timestamp}[/bold]",
expand=False))
table = Table(title="Results Comparison")
table.add_column("Metric", style="cyan")
table.add_column("Count", style="white")
table.add_column("Details", style="dim")
improved = comparison.get("improved", [])
if improved:
details = ", ".join([f"{test_identifier}" for test_identifier in improved[:3]])
if len(improved) > 3:
details += f" and {len(improved) - 3} more"
table.add_row("✅ Improved", str(len(improved)), details)
else:
table.add_row("✅ Improved", "0", "")
regressed = comparison.get("regressed", [])
if regressed:
details = ", ".join([f"{test_identifier}" for test_identifier in regressed[:3]])
if len(regressed) > 3:
details += f" and {len(regressed) - 3} more"
table.add_row("❌ Regressed", str(len(regressed)), details, style="red")
else:
table.add_row("❌ Regressed", "0", "")
unchanged = comparison.get("unchanged", [])
table.add_row("⏺ Unchanged", str(len(unchanged)), "")
new_tests = comparison.get("new_tests", [])
if new_tests:
details = ", ".join(new_tests[:3])
if len(new_tests) > 3:
details += f" and {len(new_tests) - 3} more"
table.add_row(" New Tests", str(len(new_tests)), details)
missing_tests = comparison.get("missing_tests", [])
if missing_tests:
details = ", ".join(missing_tests[:3])
if len(missing_tests) > 3:
details += f" and {len(missing_tests) - 3} more"
table.add_row(" Missing Tests", str(len(missing_tests)), details)
self.console.print(table)

View File

@@ -0,0 +1,125 @@
from collections import defaultdict
from hashlib import md5
from typing import Any
from crewai import Crew, Agent
from crewai.experimental.evaluation import AgentEvaluator, create_default_evaluator
from crewai.experimental.evaluation.experiment.result_display import ExperimentResultsDisplay
from crewai.experimental.evaluation.experiment.result import ExperimentResults, ExperimentResult
from crewai.experimental.evaluation.evaluation_display import AgentAggregatedEvaluationResult
class ExperimentRunner:
def __init__(self, dataset: list[dict[str, Any]]):
self.dataset = dataset or []
self.evaluator: AgentEvaluator | None = None
self.display = ExperimentResultsDisplay()
def run(self, crew: Crew | None = None, agents: list[Agent] | None = None, print_summary: bool = False) -> ExperimentResults:
if crew and not agents:
agents = crew.agents
assert agents is not None
self.evaluator = create_default_evaluator(agents=agents)
results = []
for test_case in self.dataset:
self.evaluator.reset_iterations_results()
result = self._run_test_case(test_case=test_case, crew=crew, agents=agents)
results.append(result)
experiment_results = ExperimentResults(results)
if print_summary:
self.display.summary(experiment_results)
return experiment_results
def _run_test_case(self, test_case: dict[str, Any], agents: list[Agent], crew: Crew | None = None) -> ExperimentResult:
inputs = test_case["inputs"]
expected_score = test_case["expected_score"]
identifier = test_case.get("identifier") or md5(str(test_case).encode(), usedforsecurity=False).hexdigest()
try:
self.display.console.print(f"[dim]Running crew with input: {str(inputs)[:50]}...[/dim]")
self.display.console.print("\n")
if crew:
crew.kickoff(inputs=inputs)
else:
for agent in agents:
agent.kickoff(**inputs)
assert self.evaluator is not None
agent_evaluations = self.evaluator.get_agent_evaluation()
actual_score = self._extract_scores(agent_evaluations)
passed = self._assert_scores(expected_score, actual_score)
return ExperimentResult(
identifier=identifier,
inputs=inputs,
score=actual_score,
expected_score=expected_score,
passed=passed,
agent_evaluations=agent_evaluations
)
except Exception as e:
self.display.console.print(f"[red]Error running test case: {str(e)}[/red]")
return ExperimentResult(
identifier=identifier,
inputs=inputs,
score=0,
expected_score=expected_score,
passed=False
)
def _extract_scores(self, agent_evaluations: dict[str, AgentAggregatedEvaluationResult]) -> float | dict[str, float]:
all_scores: dict[str, list[float]] = defaultdict(list)
for evaluation in agent_evaluations.values():
for metric_name, score in evaluation.metrics.items():
if score.score is not None:
all_scores[metric_name.value].append(score.score)
avg_scores = {m: sum(s)/len(s) for m, s in all_scores.items()}
if len(avg_scores) == 1:
return list(avg_scores.values())[0]
return avg_scores
def _assert_scores(self, expected: float | dict[str, float],
actual: float | dict[str, float]) -> bool:
"""
Compare expected and actual scores, and return whether the test case passed.
The rules for comparison are as follows:
- If both expected and actual scores are single numbers, the actual score must be >= expected.
- If expected is a single number and actual is a dict, compare against the average of actual values.
- If expected is a dict and actual is a single number, actual must be >= all expected values.
- If both are dicts, actual must have matching keys with values >= expected values.
"""
if isinstance(expected, (int, float)) and isinstance(actual, (int, float)):
return actual >= expected
if isinstance(expected, dict) and isinstance(actual, (int, float)):
return all(actual >= exp_score for exp_score in expected.values())
if isinstance(expected, (int, float)) and isinstance(actual, dict):
if not actual:
return False
avg_score = sum(actual.values()) / len(actual)
return avg_score >= expected
if isinstance(expected, dict) and isinstance(actual, dict):
if not expected:
return True
matching_keys = set(expected.keys()) & set(actual.keys())
if not matching_keys:
return False
# All matching keys must have actual >= expected
return all(actual[key] >= expected[key] for key in matching_keys)
return False

View File

@@ -0,0 +1,30 @@
"""Robust JSON parsing utilities for evaluation responses."""
import json
import re
from typing import Any
def extract_json_from_llm_response(text: str) -> dict[str, Any]:
try:
return json.loads(text)
except json.JSONDecodeError:
pass
json_patterns = [
# Standard markdown code blocks with json
r'```json\s*([\s\S]*?)\s*```',
# Code blocks without language specifier
r'```\s*([\s\S]*?)\s*```',
# Inline code with JSON
r'`([{\\[].*[}\]])`',
]
for pattern in json_patterns:
matches = re.findall(pattern, text, re.IGNORECASE | re.DOTALL)
for match in matches:
try:
return json.loads(match.strip())
except json.JSONDecodeError:
continue
raise ValueError("No valid JSON found in the response")

View File

@@ -0,0 +1,26 @@
from crewai.experimental.evaluation.metrics.reasoning_metrics import (
ReasoningEfficiencyEvaluator
)
from crewai.experimental.evaluation.metrics.tools_metrics import (
ToolSelectionEvaluator,
ParameterExtractionEvaluator,
ToolInvocationEvaluator
)
from crewai.experimental.evaluation.metrics.goal_metrics import (
GoalAlignmentEvaluator
)
from crewai.experimental.evaluation.metrics.semantic_quality_metrics import (
SemanticQualityEvaluator
)
__all__ = [
"ReasoningEfficiencyEvaluator",
"ToolSelectionEvaluator",
"ParameterExtractionEvaluator",
"ToolInvocationEvaluator",
"GoalAlignmentEvaluator",
"SemanticQualityEvaluator"
]

View File

@@ -0,0 +1,69 @@
from typing import Any, Dict
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
class GoalAlignmentEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.GOAL_ALIGNMENT
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent's output aligns with its assigned task goal.
Score the agent's goal alignment on a scale from 0-10 where:
- 0: Complete misalignment, agent did not understand or attempt the task goal
- 5: Partial alignment, agent attempted the task but missed key requirements
- 10: Perfect alignment, agent fully satisfied all task requirements
Consider:
1. Did the agent correctly interpret the task goal?
2. Did the final output directly address the requirements?
3. Did the agent focus on relevant aspects of the task?
4. Did the agent provide all requested information or deliverables?
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
Agent goal: {agent.goal}
{task_context}
Agent's final output:
{final_output}
Evaluate how well the agent's output aligns with the assigned task goal.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
return EvaluationScore(
score=evaluation_data.get("score", 0),
feedback=evaluation_data.get("feedback", response),
raw_response=response
)
except Exception:
return EvaluationScore(
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
)

View File

@@ -0,0 +1,361 @@
"""Agent reasoning efficiency evaluators.
This module provides evaluator implementations for:
- Reasoning efficiency
- Loop detection
- Thinking-to-action ratio
"""
import logging
import re
from enum import Enum
from typing import Any, Dict, List, Tuple
import numpy as np
from collections.abc import Sequence
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.tasks.task_output import TaskOutput
class ReasoningPatternType(Enum):
EFFICIENT = "efficient" # Good reasoning flow
LOOP = "loop" # Agent is stuck in a loop
VERBOSE = "verbose" # Agent is unnecessarily verbose
INDECISIVE = "indecisive" # Agent struggles to make decisions
SCATTERED = "scattered" # Agent jumps between topics without focus
class ReasoningEfficiencyEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.REASONING_EFFICIENCY
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: TaskOutput | str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}\nExpected output: {task.expected_output}\n"
llm_calls = execution_trace.get("llm_calls", [])
if not llm_calls or len(llm_calls) < 2:
return EvaluationScore(
score=None,
feedback="Insufficient LLM calls to evaluate reasoning efficiency."
)
total_calls = len(llm_calls)
total_tokens = sum(call.get("total_tokens", 0) for call in llm_calls)
avg_tokens_per_call = total_tokens / total_calls if total_calls > 0 else 0
time_intervals = []
has_reliable_timing = True
for i in range(1, len(llm_calls)):
start_time = llm_calls[i-1].get("end_time")
end_time = llm_calls[i].get("start_time")
if start_time and end_time and start_time != end_time:
try:
interval = end_time - start_time
time_intervals.append(interval.total_seconds() if hasattr(interval, 'total_seconds') else 0)
except Exception:
has_reliable_timing = False
else:
has_reliable_timing = False
loop_detected, loop_details = self._detect_loops(llm_calls)
pattern_analysis = self._analyze_reasoning_patterns(llm_calls)
efficiency_metrics = {
"total_llm_calls": total_calls,
"total_tokens": total_tokens,
"avg_tokens_per_call": avg_tokens_per_call,
"reasoning_pattern": pattern_analysis["primary_pattern"].value,
"loops_detected": loop_detected,
}
if has_reliable_timing and time_intervals:
efficiency_metrics["avg_time_between_calls"] = np.mean(time_intervals)
loop_info = f"Detected {len(loop_details)} potential reasoning loops." if loop_detected else "No significant reasoning loops detected."
call_samples = self._get_call_samples(llm_calls)
final_output = final_output.raw if isinstance(final_output, TaskOutput) else final_output
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the reasoning efficiency of an AI agent's thought process.
Evaluate the agent's reasoning efficiency across these five key subcategories:
1. Focus (0-10): How well the agent stays on topic and avoids unnecessary tangents
2. Progression (0-10): How effectively the agent builds on previous thoughts rather than repeating or circling
3. Decision Quality (0-10): How decisively and appropriately the agent makes decisions
4. Conciseness (0-10): How efficiently the agent communicates without unnecessary verbosity
5. Loop Avoidance (0-10): How well the agent avoids getting stuck in repetitive thinking patterns
For each subcategory, provide a score from 0-10 where:
- 0: Completely inefficient
- 5: Moderately efficient
- 10: Highly efficient
The overall score should be a weighted average of these subcategories.
Return your evaluation as JSON with the following structure:
{
"overall_score": float,
"scores": {
"focus": float,
"progression": float,
"decision_quality": float,
"conciseness": float,
"loop_avoidance": float
},
"feedback": string (general feedback about overall reasoning efficiency),
"optimization_suggestions": string (concrete suggestions for improving reasoning efficiency),
"detected_patterns": string (describe any inefficient reasoning patterns you observe)
}"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Reasoning efficiency metrics:
- Total LLM calls: {efficiency_metrics["total_llm_calls"]}
- Average tokens per call: {efficiency_metrics["avg_tokens_per_call"]:.1f}
- Primary reasoning pattern: {efficiency_metrics["reasoning_pattern"]}
- {loop_info}
{"- Average time between calls: {:.2f} seconds".format(efficiency_metrics.get("avg_time_between_calls", 0)) if "avg_time_between_calls" in efficiency_metrics else ""}
Sample of agent reasoning flow (chronological sequence):
{call_samples}
Agent's final output:
{final_output[:500]}... (truncated)
Evaluate the reasoning efficiency of this agent based on these interaction patterns.
Identify any inefficient reasoning patterns and provide specific suggestions for optimization.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
scores = evaluation_data.get("scores", {})
focus = scores.get("focus", 5.0)
progression = scores.get("progression", 5.0)
decision_quality = scores.get("decision_quality", 5.0)
conciseness = scores.get("conciseness", 5.0)
loop_avoidance = scores.get("loop_avoidance", 5.0)
overall_score = evaluation_data.get("overall_score", evaluation_data.get("score", 5.0))
feedback = evaluation_data.get("feedback", "No detailed feedback provided.")
optimization_suggestions = evaluation_data.get("optimization_suggestions", "No specific suggestions provided.")
detailed_feedback = "Reasoning Efficiency Evaluation:\n"
detailed_feedback += f"• Focus: {focus}/10 - Staying on topic without tangents\n"
detailed_feedback += f"• Progression: {progression}/10 - Building on previous thinking\n"
detailed_feedback += f"• Decision Quality: {decision_quality}/10 - Making appropriate decisions\n"
detailed_feedback += f"• Conciseness: {conciseness}/10 - Communicating efficiently\n"
detailed_feedback += f"• Loop Avoidance: {loop_avoidance}/10 - Avoiding repetitive patterns\n\n"
detailed_feedback += f"Feedback:\n{feedback}\n\n"
detailed_feedback += f"Optimization Suggestions:\n{optimization_suggestions}"
return EvaluationScore(
score=float(overall_score),
feedback=detailed_feedback,
raw_response=response
)
except Exception as e:
logging.warning(f"Failed to parse reasoning efficiency evaluation: {e}")
return EvaluationScore(
score=None,
feedback=f"Failed to parse reasoning efficiency evaluation. Raw response: {response[:200]}...",
raw_response=response
)
def _detect_loops(self, llm_calls: List[Dict]) -> Tuple[bool, List[Dict]]:
loop_details = []
messages = []
for call in llm_calls:
content = call.get("response", "")
if isinstance(content, str):
messages.append(content)
elif isinstance(content, list) and len(content) > 0:
# Handle message list format
for msg in content:
if isinstance(msg, dict) and "content" in msg:
messages.append(msg["content"])
# Simple n-gram based similarity detection
# For a more robust implementation, consider using embedding-based similarity
for i in range(len(messages) - 2):
for j in range(i + 1, len(messages) - 1):
# Check for repeated patterns (simplistic approach)
# A more sophisticated approach would use semantic similarity
similarity = self._calculate_text_similarity(messages[i], messages[j])
if similarity > 0.7: # Arbitrary threshold
loop_details.append({
"first_occurrence": i,
"second_occurrence": j,
"similarity": similarity,
"snippet": messages[i][:100] + "..."
})
return len(loop_details) > 0, loop_details
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
text1 = re.sub(r'\s+', ' ', text1.lower()).strip()
text2 = re.sub(r'\s+', ' ', text2.lower()).strip()
# Simple Jaccard similarity on word sets
words1 = set(text1.split())
words2 = set(text2.split())
intersection = len(words1.intersection(words2))
union = len(words1.union(words2))
return intersection / union if union > 0 else 0.0
def _analyze_reasoning_patterns(self, llm_calls: List[Dict]) -> Dict[str, Any]:
call_lengths = []
response_times = []
for call in llm_calls:
content = call.get("response", "")
if isinstance(content, str):
call_lengths.append(len(content))
elif isinstance(content, list) and len(content) > 0:
# Handle message list format
total_length = 0
for msg in content:
if isinstance(msg, dict) and "content" in msg:
total_length += len(msg["content"])
call_lengths.append(total_length)
start_time = call.get("start_time")
end_time = call.get("end_time")
if start_time and end_time:
try:
response_times.append(end_time - start_time)
except Exception:
pass
avg_length = np.mean(call_lengths) if call_lengths else 0
std_length = np.std(call_lengths) if call_lengths else 0
length_trend = self._calculate_trend(call_lengths)
primary_pattern = ReasoningPatternType.EFFICIENT
details = "Agent demonstrates efficient reasoning patterns."
loop_score = self._calculate_loop_likelihood(call_lengths, response_times)
if loop_score > 0.7:
primary_pattern = ReasoningPatternType.LOOP
details = "Agent appears to be stuck in repetitive thinking patterns."
elif avg_length > 1000 and std_length / avg_length < 0.3:
primary_pattern = ReasoningPatternType.VERBOSE
details = "Agent is consistently verbose across interactions."
elif len(llm_calls) > 10 and length_trend > 0.5:
primary_pattern = ReasoningPatternType.INDECISIVE
details = "Agent shows signs of indecisiveness with increasing message lengths."
elif std_length / avg_length > 0.8:
primary_pattern = ReasoningPatternType.SCATTERED
details = "Agent shows inconsistent reasoning flow with highly variable responses."
return {
"primary_pattern": primary_pattern,
"details": details,
"metrics": {
"avg_length": avg_length,
"std_length": std_length,
"length_trend": length_trend,
"loop_score": loop_score
}
}
def _calculate_trend(self, values: Sequence[float | int]) -> float:
if not values or len(values) < 2:
return 0.0
try:
x = np.arange(len(values))
y = np.array(values)
# Simple linear regression
slope = np.polyfit(x, y, 1)[0]
# Normalize slope to -1 to 1 range
max_possible_slope = max(values) - min(values)
if max_possible_slope > 0:
normalized_slope = slope / max_possible_slope
return max(min(normalized_slope, 1.0), -1.0)
return 0.0
except Exception:
return 0.0
def _calculate_loop_likelihood(self, call_lengths: Sequence[float], response_times: Sequence[float]) -> float:
if not call_lengths or len(call_lengths) < 3:
return 0.0
indicators = []
if len(call_lengths) >= 4:
repeated_lengths = 0
for i in range(len(call_lengths) - 2):
ratio = call_lengths[i] / call_lengths[i + 2] if call_lengths[i + 2] > 0 else 0
if 0.85 <= ratio <= 1.15:
repeated_lengths += 1
length_repetition_score = repeated_lengths / (len(call_lengths) - 2)
indicators.append(length_repetition_score)
if response_times and len(response_times) >= 3:
try:
std_time = np.std(response_times)
mean_time = np.mean(response_times)
if mean_time > 0:
time_consistency = 1.0 - (std_time / mean_time)
indicators.append(max(0, time_consistency - 0.3) * 1.5)
except Exception:
pass
return np.mean(indicators) if indicators else 0.0
def _get_call_samples(self, llm_calls: List[Dict]) -> str:
samples = []
if len(llm_calls) <= 6:
sample_indices = list(range(len(llm_calls)))
else:
sample_indices = [0, 1, len(llm_calls) // 2 - 1, len(llm_calls) // 2,
len(llm_calls) - 2, len(llm_calls) - 1]
for idx in sample_indices:
call = llm_calls[idx]
content = call.get("response", "")
if isinstance(content, str):
sample = content
elif isinstance(content, list) and len(content) > 0:
sample_parts = []
for msg in content:
if isinstance(msg, dict) and "content" in msg:
sample_parts.append(msg["content"])
sample = "\n".join(sample_parts)
else:
sample = str(content)
truncated = sample[:200] + "..." if len(sample) > 200 else sample
samples.append(f"Call {idx + 1}:\n{truncated}\n")
return "\n".join(samples)

View File

@@ -0,0 +1,68 @@
from typing import Any, Dict
from crewai.agent import Agent
from crewai.task import Task
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
class SemanticQualityEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.SEMANTIC_QUALITY
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: Any,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing the semantic quality of an AI agent's output.
Score the semantic quality on a scale from 0-10 where:
- 0: Completely incoherent, confusing, or logically flawed output
- 5: Moderately clear and logical output with some issues
- 10: Exceptionally clear, coherent, and logically sound output
Consider:
1. Is the output well-structured and organized?
2. Is the reasoning logical and well-supported?
3. Is the language clear, precise, and appropriate for the task?
4. Are claims supported by evidence when appropriate?
5. Is the output free from contradictions and logical fallacies?
Return your evaluation as JSON with fields 'score' (number) and 'feedback' (string).
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Agent's final output:
{final_output}
Evaluate the semantic quality and reasoning of this output.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data: dict[str, Any] = extract_json_from_llm_response(response)
assert evaluation_data is not None
return EvaluationScore(
score=float(evaluation_data["score"]) if evaluation_data.get("score") is not None else None,
feedback=evaluation_data.get("feedback", response),
raw_response=response
)
except Exception:
return EvaluationScore(
score=None,
feedback=f"Failed to parse evaluation. Raw response: {response}",
raw_response=response
)

View File

@@ -0,0 +1,410 @@
import json
from typing import Dict, Any
from crewai.experimental.evaluation.base_evaluator import BaseEvaluator, EvaluationScore, MetricCategory
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.agent import Agent
from crewai.task import Task
class ToolSelectionEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.TOOL_SELECTION
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
unique_tool_types = set([tool.get("tool", "Unknown tool") for tool in tool_uses])
if tool_count == 0:
if not agent.tools:
return EvaluationScore(
score=None,
feedback="Agent had no tools available to use."
)
else:
return EvaluationScore(
score=None,
feedback="Agent had tools available but didn't use any."
)
available_tools_info = ""
if agent.tools:
for tool in agent.tools:
available_tools_info += f"- {tool.name}: {tool.description}\n"
else:
available_tools_info = "No tools available"
tool_types_summary = "Tools selected by the agent:\n"
for tool_type in sorted(unique_tool_types):
tool_types_summary += f"- {tool_type}\n"
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing if an AI agent selected the most appropriate tools for a given task.
You must evaluate based on these 2 criteria:
1. Relevance (0-10): Were the tools chosen directly aligned with the task's goals?
2. Coverage (0-10): Did the agent select ALL appropriate tools from the AVAILABLE tools?
IMPORTANT:
- ONLY consider tools that are listed as available to the agent
- DO NOT suggest tools that aren't in the 'Available tools' list
- DO NOT evaluate the quality or accuracy of tool outputs/results
- DO NOT evaluate how many times each tool was used
- DO NOT evaluate how the agent used the parameters
- DO NOT evaluate whether the agent interpreted the task correctly
Focus ONLY on whether the correct CATEGORIES of tools were selected from what was available.
Return your evaluation as JSON with these fields:
- scores: {"relevance": number, "coverage": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on tool selection decisions from available tools)
- improvement_suggestions: string (ONLY suggest better selection from the AVAILABLE tools list, NOT new tools)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Available tools for this agent:
{available_tools_info}
{tool_types_summary}
Based ONLY on the task description and comparing the AVAILABLE tools with those that were selected (listed above), evaluate if the agent selected the appropriate tool types for this task.
IMPORTANT:
- ONLY evaluate selection from tools listed as available
- DO NOT suggest new tools that aren't in the available tools list
- DO NOT evaluate tool usage or results
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
relevance = scores.get("relevance", 5.0)
coverage = scores.get("coverage", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Tool Selection Evaluation:\n"
feedback += f"• Relevance: {relevance}/10 - Selection of appropriate tool types for the task\n"
feedback += f"• Coverage: {coverage}/10 - Selection of all necessary tool types\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating tool selection: {e}",
raw_response=response
)
class ParameterExtractionEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.PARAMETER_EXTRACTION
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_count = len(tool_uses)
if tool_count == 0:
return EvaluationScore(
score=None,
feedback="No tool usage detected. Cannot evaluate parameter extraction."
)
validation_errors = []
for tool_use in tool_uses:
if not tool_use.get("success", True) and tool_use.get("error_type") == "validation_error":
validation_errors.append({
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"args": tool_use.get("args", {})
})
validation_error_rate = len(validation_errors) / tool_count if tool_count > 0 else 0
param_samples = []
for i, tool_use in enumerate(tool_uses[:5]):
tool_name = tool_use.get("tool", "Unknown tool")
tool_args = tool_use.get("args", {})
success = tool_use.get("success", True) and not tool_use.get("error", False)
error_type = tool_use.get("error_type", "") if not success else ""
is_validation_error = error_type == "validation_error"
sample = f"Tool use #{i+1} - {tool_name}:\n"
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
sample += f"- Success: {'No' if not success else 'Yes'}"
if is_validation_error:
sample += " (PARAMETER VALIDATION ERROR)\n"
sample += f"- Error: {tool_use.get('result', 'Unknown error')}"
elif not success:
sample += f" (Other error: {error_type})\n"
param_samples.append(sample)
validation_errors_info = ""
if validation_errors:
validation_errors_info = f"\nParameter validation errors detected: {len(validation_errors)} ({validation_error_rate:.1%} of tool uses)\n"
for i, err in enumerate(validation_errors[:3]):
tool_name = err.get("tool", "Unknown tool")
error_msg = err.get("error", "Unknown error")
args = err.get("args", {})
validation_errors_info += f"\nValidation Error #{i+1}:\n- Tool: {tool_name}\n- Args: {json.dumps(args, indent=2)}\n- Error: {error_msg}"
if len(validation_errors) > 3:
validation_errors_info += f"\n...and {len(validation_errors) - 3} more validation errors."
param_samples_text = "\n\n".join(param_samples)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how well an AI agent extracts and formats PARAMETER VALUES for tool calls.
Your job is to evaluate ONLY whether the agent used the correct parameter VALUES, not whether the right tools were selected or how the tools were invoked.
Evaluate parameter extraction based on these criteria:
1. Accuracy (0-10): Are parameter values correctly identified from the context/task?
2. Formatting (0-10): Are values formatted correctly for each tool's requirements?
3. Completeness (0-10): Are all required parameter values provided, with no missing information?
IMPORTANT: DO NOT evaluate:
- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job)
- How the tools were structurally invoked (that's the ToolInvocationEvaluator's job)
- The quality of results from tools
Focus ONLY on the PARAMETER VALUES - whether they were correctly extracted from the context, properly formatted, and complete.
Validation errors are important signals that parameter values weren't properly extracted or formatted.
Return your evaluation as JSON with these fields:
- scores: {"accuracy": number, "formatting": number, "completeness": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on parameter value extraction quality)
- improvement_suggestions: string (concrete suggestions for better parameter VALUE extraction)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Parameter extraction examples:
{param_samples_text}
{validation_errors_info}
Evaluate the quality of the agent's parameter extraction for this task.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
accuracy = scores.get("accuracy", 5.0)
formatting = scores.get("formatting", 5.0)
completeness = scores.get("completeness", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Parameter Extraction Evaluation:\n"
feedback += f"• Accuracy: {accuracy}/10 - Correctly identifying required parameters\n"
feedback += f"• Formatting: {formatting}/10 - Properly formatting parameters for tools\n"
feedback += f"• Completeness: {completeness}/10 - Including all necessary information\n\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating parameter extraction: {e}",
raw_response=response
)
class ToolInvocationEvaluator(BaseEvaluator):
@property
def metric_category(self) -> MetricCategory:
return MetricCategory.TOOL_INVOCATION
def evaluate(
self,
agent: Agent,
execution_trace: Dict[str, Any],
final_output: str,
task: Task | None = None,
) -> EvaluationScore:
task_context = ""
if task is not None:
task_context = f"Task description: {task.description}"
tool_uses = execution_trace.get("tool_uses", [])
tool_errors = []
tool_count = len(tool_uses)
if tool_count == 0:
return EvaluationScore(
score=None,
feedback="No tool usage detected. Cannot evaluate tool invocation."
)
for tool_use in tool_uses:
if not tool_use.get("success", True) or tool_use.get("error", False):
error_info = {
"tool": tool_use.get("tool", "Unknown tool"),
"error": tool_use.get("result"),
"error_type": tool_use.get("error_type", "unknown_error")
}
tool_errors.append(error_info)
error_rate = len(tool_errors) / tool_count if tool_count > 0 else 0
error_types = {}
for error in tool_errors:
error_type = error.get("error_type", "unknown_error")
if error_type not in error_types:
error_types[error_type] = 0
error_types[error_type] += 1
invocation_samples = []
for i, tool_use in enumerate(tool_uses[:5]):
tool_name = tool_use.get("tool", "Unknown tool")
tool_args = tool_use.get("args", {})
success = tool_use.get("success", True) and not tool_use.get("error", False)
error_type = tool_use.get("error_type", "") if not success else ""
error_msg = tool_use.get("result", "No error") if not success else "No error"
sample = f"Tool invocation #{i+1}:\n"
sample += f"- Tool: {tool_name}\n"
sample += f"- Parameters: {json.dumps(tool_args, indent=2)}\n"
sample += f"- Success: {'No' if not success else 'Yes'}\n"
if not success:
sample += f"- Error type: {error_type}\n"
sample += f"- Error: {error_msg}"
invocation_samples.append(sample)
error_type_summary = ""
if error_types:
error_type_summary = "Error type breakdown:\n"
for error_type, count in error_types.items():
error_type_summary += f"- {error_type}: {count} occurrences ({(count/tool_count):.1%})\n"
invocation_samples_text = "\n\n".join(invocation_samples)
prompt = [
{"role": "system", "content": """You are an expert evaluator assessing how correctly an AI agent's tool invocations are STRUCTURED.
Your job is to evaluate ONLY the structural and syntactical aspects of how the agent called tools, NOT which tools were selected or what parameter values were used.
Evaluate the agent's tool invocation based on these criteria:
1. Structure (0-10): Does the tool call follow the expected syntax and format?
2. Error Handling (0-10): Does the agent handle tool errors appropriately?
3. Invocation Patterns (0-10): Are tool calls properly sequenced, batched, or managed?
Error types that indicate invocation issues:
- execution_error: The tool was called correctly but failed during execution
- usage_error: General errors in how the tool was used structurally
IMPORTANT: DO NOT evaluate:
- Whether the right tool was chosen (that's the ToolSelectionEvaluator's job)
- Whether the parameter values are correct (that's the ParameterExtractionEvaluator's job)
- The quality of results from tools
Focus ONLY on HOW tools were invoked - the structure, format, and handling of the invocation process.
Return your evaluation as JSON with these fields:
- scores: {"structure": number, "error_handling": number, "invocation_patterns": number}
- overall_score: number (average of all scores, 0-10)
- feedback: string (focused ONLY on structural aspects of tool invocation)
- improvement_suggestions: string (concrete suggestions for better structuring of tool calls)
"""},
{"role": "user", "content": f"""
Agent role: {agent.role}
{task_context}
Tool invocation examples:
{invocation_samples_text}
Tool error rate: {error_rate:.2%} ({len(tool_errors)} errors out of {tool_count} invocations)
{error_type_summary}
Evaluate the quality of the agent's tool invocation structure during this task.
"""}
]
assert self.llm is not None
response = self.llm.call(prompt)
try:
evaluation_data = extract_json_from_llm_response(response)
assert evaluation_data is not None
scores = evaluation_data.get("scores", {})
structure = scores.get("structure", 5.0)
error_handling = scores.get("error_handling", 5.0)
invocation_patterns = scores.get("invocation_patterns", 5.0)
overall_score = float(evaluation_data.get("overall_score", 5.0))
feedback = "Tool Invocation Evaluation:\n"
feedback += f"• Structure: {structure}/10 - Following proper syntax and format\n"
feedback += f"• Error Handling: {error_handling}/10 - Appropriately handling tool errors\n"
feedback += f"• Invocation Patterns: {invocation_patterns}/10 - Proper sequencing and management of calls\n\n"
if "improvement_suggestions" in evaluation_data:
feedback += f"Improvement Suggestions:\n{evaluation_data['improvement_suggestions']}"
else:
feedback += evaluation_data.get("feedback", "No detailed feedback available.")
return EvaluationScore(
score=overall_score,
feedback=feedback,
raw_response=response
)
except Exception as e:
return EvaluationScore(
score=None,
feedback=f"Error evaluating tool invocation: {e}",
raw_response=response
)

View File

@@ -0,0 +1,52 @@
import inspect
from typing_extensions import Any
import warnings
from crewai.experimental.evaluation.experiment import ExperimentResults, ExperimentRunner
from crewai import Crew, Agent
def assert_experiment_successfully(experiment_results: ExperimentResults, baseline_filepath: str | None = None) -> None:
failed_tests = [result for result in experiment_results.results if not result.passed]
if failed_tests:
detailed_failures: list[str] = []
for result in failed_tests:
expected = result.expected_score
actual = result.score
detailed_failures.append(f"- {result.identifier}: expected {expected}, got {actual}")
failure_details = "\n".join(detailed_failures)
raise AssertionError(f"The following test cases failed:\n{failure_details}")
baseline_filepath = baseline_filepath or _get_baseline_filepath_fallback()
comparison = experiment_results.compare_with_baseline(baseline_filepath=baseline_filepath)
assert_experiment_no_regression(comparison)
def assert_experiment_no_regression(comparison_result: dict[str, list[str]]) -> None:
regressed = comparison_result.get("regressed", [])
if regressed:
raise AssertionError(f"Regression detected! The following tests that previously passed now fail: {regressed}")
missing_tests = comparison_result.get("missing_tests", [])
if missing_tests:
warnings.warn(
f"Warning: {len(missing_tests)} tests from the baseline are missing in the current run: {missing_tests}",
UserWarning
)
def run_experiment(dataset: list[dict[str, Any]], crew: Crew | None = None, agents: list[Agent] | None = None, verbose: bool = False) -> ExperimentResults:
runner = ExperimentRunner(dataset=dataset)
return runner.run(agents=agents, crew=crew, print_summary=verbose)
def _get_baseline_filepath_fallback() -> str:
test_func_name = "experiment_fallback"
try:
current_frame = inspect.currentframe()
if current_frame is not None:
test_func_name = current_frame.f_back.f_back.f_code.co_name # type: ignore[union-attr]
except Exception:
...
return f"{test_func_name}_results.json"

View File

@@ -2,6 +2,7 @@ import asyncio
import copy
import inspect
import logging
import os
from typing import (
Any,
Callable,
@@ -32,6 +33,9 @@ from crewai.utilities.events.flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)
@@ -436,6 +440,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
_routers: Set[str] = set()
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
name: Optional[str] = None
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore
@@ -464,7 +469,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Initialize state with initial values
self._state = self._create_initial_state()
if os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true":
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
# Apply any additional kwargs
if kwargs:
self._initialize_state(kwargs)
@@ -473,7 +480,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowCreatedEvent(
type="flow_created",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
),
)
@@ -769,7 +776,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
),
)
@@ -792,7 +799,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
result=final_output,
),
)
@@ -834,7 +841,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
),
@@ -856,7 +863,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
state=self._copy_state(),
result=result,
),
@@ -869,7 +876,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
@@ -1076,7 +1083,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowPlotEvent(
type="flow_plot",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
),
)
plot_flow(self, filename)

View File

@@ -81,7 +81,7 @@ class SQLiteFlowPersistence(FlowPersistence):
"""
# Convert state_data to dict, handling both Pydantic and dict cases
if isinstance(state_data, BaseModel):
state_dict = dict(state_data) # Use dict() for better type compatibility
state_dict = state_data.model_dump()
elif isinstance(state_data, dict):
state_dict = state_data
else:

View File

@@ -1,55 +0,0 @@
from abc import ABC, abstractmethod
from typing import List
import numpy as np
class BaseEmbedder(ABC):
"""
Abstract base class for text embedding models
"""
@abstractmethod
def embed_chunks(self, chunks: List[str]) -> np.ndarray:
"""
Generate embeddings for a list of text chunks
Args:
chunks: List of text chunks to embed
Returns:
Array of embeddings
"""
pass
@abstractmethod
def embed_texts(self, texts: List[str]) -> np.ndarray:
"""
Generate embeddings for a list of texts
Args:
texts: List of texts to embed
Returns:
Array of embeddings
"""
pass
@abstractmethod
def embed_text(self, text: str) -> np.ndarray:
"""
Generate embedding for a single text
Args:
text: Text to embed
Returns:
Embedding array
"""
pass
@property
@abstractmethod
def dimension(self) -> int:
"""Get the dimension of the embeddings"""
pass

View File

@@ -13,11 +13,12 @@ from chromadb.api.types import OneOrMany
from chromadb.config import Settings
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
from crewai.utilities.paths import db_storage_path
from crewai.utilities.chromadb import create_persistent_client
@contextlib.contextmanager
@@ -84,14 +85,11 @@ class KnowledgeStorage(BaseKnowledgeStorage):
raise Exception("Collection not initialized")
def initialize_knowledge_storage(self):
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
self.app = create_persistent_client(
path=os.path.join(db_storage_path(), "knowledge"),
settings=Settings(allow_reset=True),
)
self.app = chroma_client
try:
collection_name = (
f"knowledge_{self.collection_name}"
@@ -111,9 +109,8 @@ class KnowledgeStorage(BaseKnowledgeStorage):
def reset(self):
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
if not self.app:
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
self.app = create_persistent_client(
path=base_path, settings=Settings(allow_reset=True)
)
self.app.reset()

View File

@@ -40,7 +40,7 @@ from crewai.agents.parser import (
OutputParserException,
)
from crewai.flow.flow_trackable import FlowTrackable
from crewai.llm import LLM
from crewai.llm import LLM, BaseLLM
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities import I18N
@@ -135,7 +135,7 @@ class LiteAgent(FlowTrackable, BaseModel):
role: str = Field(description="Role of the agent")
goal: str = Field(description="Goal of the agent")
backstory: str = Field(description="Backstory of the agent")
llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
llm: Optional[Union[str, InstanceOf[BaseLLM], Any]] = Field(
default=None, description="Language model that will run the agent"
)
tools: List[BaseTool] = Field(
@@ -147,7 +147,7 @@ class LiteAgent(FlowTrackable, BaseModel):
default=15, description="Maximum number of iterations for tool usage"
)
max_execution_time: Optional[int] = Field(
default=None, description="Maximum execution time in seconds"
default=None, description=". Maximum execution time in seconds"
)
respect_context_window: bool = Field(
default=True,
@@ -209,8 +209,10 @@ class LiteAgent(FlowTrackable, BaseModel):
def setup_llm(self):
"""Set up the LLM and other components after initialization."""
self.llm = create_llm(self.llm)
if not isinstance(self.llm, LLM):
raise ValueError("Unable to create LLM instance")
if not isinstance(self.llm, BaseLLM):
raise ValueError(
f"Expected LLM instance of type BaseLLM, got {type(self.llm).__name__}"
)
# Initialize callbacks
token_callback = TokenCalcHandler(token_cost_process=self._token_process)
@@ -232,7 +234,10 @@ class LiteAgent(FlowTrackable, BaseModel):
elif isinstance(self.guardrail, str):
from crewai.tasks.llm_guardrail import LLMGuardrail
assert isinstance(self.llm, LLM)
if not isinstance(self.llm, BaseLLM):
raise TypeError(
f"Guardrail requires LLM instance of type BaseLLM, got {type(self.llm).__name__}"
)
self._guardrail = LLMGuardrail(description=self.guardrail, llm=self.llm)
@@ -304,6 +309,7 @@ class LiteAgent(FlowTrackable, BaseModel):
"""
# Create agent info for event emission
agent_info = {
"id": self.id,
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
@@ -513,7 +519,8 @@ class LiteAgent(FlowTrackable, BaseModel):
enforce_rpm_limit(self.request_within_rpm_limit)
# Emit LLM call started event
llm = cast(LLM, self.llm)
model = llm.model if hasattr(llm, "model") else "unknown"
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
@@ -521,6 +528,7 @@ class LiteAgent(FlowTrackable, BaseModel):
tools=None,
callbacks=self._callbacks,
from_agent=self,
model=model,
),
)
@@ -537,9 +545,11 @@ class LiteAgent(FlowTrackable, BaseModel):
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(
messages=self._messages,
response=answer,
call_type=LLMCallType.LLM_CALL,
from_agent=self,
model=model,
),
)
except Exception as e:

View File

@@ -59,6 +59,8 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
load_dotenv()
litellm.suppress_debug_info = True
class FilteredStream(io.TextIOBase):
_lock = None
@@ -76,10 +78,9 @@ class FilteredStream(io.TextIOBase):
# Skip common noisy LiteLLM banners and any other lines that contain "litellm"
if (
"give feedback / get help" in lower_s
or "litellm.info:" in lower_s
or "litellm" in lower_s
or "Consider using a smaller input or implementing a text splitting strategy" in lower_s
"litellm.info:" in lower_s
or "Consider using a smaller input or implementing a text splitting strategy"
in lower_s
):
return 0
@@ -287,6 +288,8 @@ class AccumulatedToolArgs(BaseModel):
class LLM(BaseLLM):
completion_cost: Optional[float] = None
def __init__(
self,
model: str,
@@ -508,7 +511,6 @@ class LLM(BaseLLM):
# Enable tool calls using streaming
if "tool_calls" in delta:
tool_calls = delta["tool_calls"]
if tool_calls:
result = self._handle_streaming_tool_calls(
tool_calls=tool_calls,
@@ -517,6 +519,7 @@ class LLM(BaseLLM):
from_task=from_task,
from_agent=from_agent,
)
if result is not None:
chunk_content = result
@@ -533,7 +536,11 @@ class LLM(BaseLLM):
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMStreamChunkEvent(chunk=chunk_content, from_task=from_task, from_agent=from_agent),
event=LLMStreamChunkEvent(
chunk=chunk_content,
from_task=from_task,
from_agent=from_agent,
),
)
# --- 4) Fallback to non-streaming if no content received
if not full_response.strip() and chunk_count == 0:
@@ -546,7 +553,11 @@ class LLM(BaseLLM):
"stream_options", None
) # Remove stream_options for non-streaming call
return self._handle_non_streaming_response(
non_streaming_params, callbacks, available_functions, from_task, from_agent
non_streaming_params,
callbacks,
available_functions,
from_task,
from_agent,
)
# --- 5) Handle empty response with chunks
@@ -631,7 +642,13 @@ class LLM(BaseLLM):
# Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
self._handle_emit_call_events(
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return full_response
# --- 9) Handle tool calls if present
@@ -643,7 +660,13 @@ class LLM(BaseLLM):
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# --- 11) Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
self._handle_emit_call_events(
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return full_response
except ContextWindowExceededError as e:
@@ -655,14 +678,22 @@ class LLM(BaseLLM):
logging.error(f"Error in streaming response: {str(e)}")
if full_response.strip():
logging.warning(f"Returning partial response despite error: {str(e)}")
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL, from_task, from_agent)
self._handle_emit_call_events(
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return full_response
# Emit failed event and re-raise the exception
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e), from_task=from_task, from_agent=from_agent),
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
raise Exception(f"Failed to get streaming response: {str(e)}")
@@ -760,7 +791,7 @@ class LLM(BaseLLM):
available_functions: Optional[Dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> str:
) -> str | Any:
"""Handle a non-streaming response from the LLM.
Args:
@@ -780,17 +811,16 @@ class LLM(BaseLLM):
# across the codebase. This allows CrewAgentExecutor to handle context
# length issues appropriately.
response = litellm.completion(**params)
except ContextWindowExceededError as e:
# Convert litellm's context window error to our own exception type
# for consistent handling in the rest of the codebase
raise LLMContextLengthExceededException(str(e))
# --- 2) Extract response message and content
response_message = cast(Choices, cast(ModelResponse, response).choices)[
0
].message
text_response = response_message.content or ""
# --- 3) Handle callbacks with usage info
if callbacks and len(callbacks) > 0:
for callback in callbacks:
@@ -803,22 +833,35 @@ class LLM(BaseLLM):
start_time=0,
end_time=0,
)
# --- 4) Check for tool calls
tool_calls = getattr(response_message, "tool_calls", [])
# --- 5) If no tool calls or no available functions, return the text response directly
if not tool_calls or not available_functions:
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task, from_agent)
# --- 5) If no tool calls or no available functions, return the text response directly as long as there is a text response
if (not tool_calls or not available_functions) and text_response:
self._handle_emit_call_events(
response=text_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return text_response
# --- 6) If there is no text response, no available functions, but there are tool calls, return the tool calls
elif tool_calls and not available_functions and not text_response:
return tool_calls
# --- 6) Handle tool calls if present
# --- 7) Handle tool calls if present
tool_result = self._handle_tool_call(tool_calls, available_functions)
if tool_result is not None:
return tool_result
# --- 7) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL, from_task, from_agent)
# --- 8) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(
response=text_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return text_response
def _handle_tool_call(
@@ -861,6 +904,7 @@ class LLM(BaseLLM):
tool_args=function_args,
),
)
result = fn(**function_args)
crewai_event_bus.emit(
self,
@@ -874,7 +918,9 @@ class LLM(BaseLLM):
)
# --- 3.3) Emit success event
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
self._handle_emit_call_events(
response=result, call_type=LLMCallType.TOOL_CALL
)
return result
except Exception as e:
# --- 3.4) Handle execution errors
@@ -892,7 +938,7 @@ class LLM(BaseLLM):
event=ToolUsageErrorEvent(
tool_name=function_name,
tool_args=function_args,
error=f"Tool execution error: {str(e)}"
error=f"Tool execution error: {str(e)}",
),
)
return None
@@ -942,6 +988,7 @@ class LLM(BaseLLM):
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
@@ -951,22 +998,18 @@ class LLM(BaseLLM):
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
message["role"] = "assistant"
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
return self._handle_streaming_response(
@@ -983,25 +1026,69 @@ class LLM(BaseLLM):
# whether to summarize the content or abort based on the respect_context_window flag
raise
except Exception as e:
unsupported_stop = "Unsupported parameter" in str(
e
) and "'stop'" in str(e)
if unsupported_stop:
if (
"additional_drop_params" in self.additional_params
and isinstance(
self.additional_params["additional_drop_params"], list
)
):
self.additional_params["additional_drop_params"].append("stop")
else:
self.additional_params = {"additional_drop_params": ["stop"]}
logging.info("Retrying LLM call without the unsupported 'stop'")
return self.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
)
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e), from_task=from_task, from_agent=from_agent),
event=LLMCallFailedEvent(
error=str(e), from_task=from_task, from_agent=from_agent
),
)
logging.error(f"LiteLLM call failed: {str(e)}")
raise
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None):
def _handle_emit_call_events(
self,
response: Any,
call_type: LLMCallType,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
messages: str | list[dict[str, Any]] | None = None,
):
"""Handle the events for the LLM call.
Args:
response (str): The response from the LLM call.
call_type (str): The type of call, either "tool_call" or "llm_call".
from_task: Optional task object
from_agent: Optional agent object
messages: Optional messages object
"""
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(response=response, call_type=call_type, from_task=from_task, from_agent=from_agent),
event=LLMCallCompletedEvent(
messages=messages,
response=response,
call_type=call_type,
from_task=from_task,
from_agent=from_agent,
model=self.model,
),
)
def _format_messages_for_provider(
@@ -1054,6 +1141,17 @@ class LLM(BaseLLM):
messages.append({"role": "user", "content": "Please continue."})
return messages
# TODO: Remove this code after merging PR https://github.com/BerriAI/litellm/pull/10917
# Ollama doesn't supports last message to be 'assistant'
if (
"ollama" in self.model.lower()
and messages
and messages[-1]["role"] == "assistant"
):
messages = messages.copy()
messages.append({"role": "user", "content": ""})
return messages
# Handle Anthropic models
if not self.is_anthropic:
return messages
@@ -1073,7 +1171,7 @@ class LLM(BaseLLM):
- If there is no '/', defaults to "openai".
"""
if "/" in self.model:
return self.model.split("/")[0]
return self.model.partition("/")[0]
return None
def _validate_call_params(self) -> None:

View File

@@ -1,11 +1,9 @@
from .entity.entity_memory import EntityMemory
from .long_term.long_term_memory import LongTermMemory
from .short_term.short_term_memory import ShortTermMemory
from .user.user_memory import UserMemory
from .external.external_memory import ExternalMemory
__all__ = [
"UserMemory",
"EntityMemory",
"LongTermMemory",
"ShortTermMemory",

View File

@@ -1,32 +1,24 @@
from typing import Any, Dict, Optional
from typing import Optional
from crewai.memory import (
EntityMemory,
ExternalMemory,
LongTermMemory,
ShortTermMemory,
UserMemory,
)
class ContextualMemory:
def __init__(
self,
memory_config: Optional[Dict[str, Any]],
stm: ShortTermMemory,
ltm: LongTermMemory,
em: EntityMemory,
um: UserMemory,
exm: ExternalMemory,
):
if memory_config is not None:
self.memory_provider = memory_config.get("provider")
else:
self.memory_provider = None
self.stm = stm
self.ltm = ltm
self.em = em
self.um = um
self.exm = exm
def build_context_for_task(self, task, context) -> str:
@@ -44,8 +36,6 @@ class ContextualMemory:
context.append(self._fetch_stm_context(query))
context.append(self._fetch_entity_context(query))
context.append(self._fetch_external_context(query))
if self.memory_provider == "mem0":
context.append(self._fetch_user_context(query))
return "\n".join(filter(None, context))
def _fetch_stm_context(self, query) -> str:
@@ -60,7 +50,7 @@ class ContextualMemory:
stm_results = self.stm.search(query)
formatted_results = "\n".join(
[
f"- {result['memory'] if self.memory_provider == 'mem0' else result['context']}"
f"- {result['context']}"
for result in stm_results
]
)
@@ -100,33 +90,12 @@ class ContextualMemory:
em_results = self.em.search(query)
formatted_results = "\n".join(
[
f"- {result['memory'] if self.memory_provider == 'mem0' else result['context']}"
f"- {result['context']}"
for result in em_results
] # type: ignore # Invalid index type "str" for "str"; expected type "SupportsIndex | slice"
)
return f"Entities:\n{formatted_results}" if em_results else ""
def _fetch_user_context(self, query: str) -> str:
"""
Fetches and formats relevant user information from User Memory.
Args:
query (str): The search query to find relevant user memories.
Returns:
str: Formatted user memories as bullet points, or an empty string if none found.
"""
if self.um is None:
return ""
user_memories = self.um.search(query)
if not user_memories:
return ""
formatted_memories = "\n".join(
f"- {result['memory']}" for result in user_memories
)
return f"User memories/preferences:\n{formatted_memories}"
def _fetch_external_context(self, query: str) -> str:
"""
Fetches and formats relevant information from External Memory.
@@ -144,6 +113,6 @@ class ContextualMemory:
return ""
formatted_memories = "\n".join(
f"- {result['memory']}" for result in external_memories
f"- {result['context']}" for result in external_memories
)
return f"External memories:\n{formatted_memories}"

View File

@@ -27,11 +27,7 @@ class EntityMemory(Memory):
_memory_provider: Optional[str] = PrivateAttr()
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None
memory_provider = embedder_config.get("provider") if embedder_config else None
if memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
@@ -39,7 +35,8 @@ class EntityMemory(Memory):
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="entities", crew=crew)
config = embedder_config.get("config")
storage = Mem0Storage(type="short_term", crew=crew, config=config)
else:
storage = (
storage

View File

@@ -29,11 +29,7 @@ class ShortTermMemory(Memory):
_memory_provider: Optional[str] = PrivateAttr()
def __init__(self, crew=None, embedder_config=None, storage=None, path=None):
if crew and hasattr(crew, "memory_config") and crew.memory_config is not None:
memory_provider = crew.memory_config.get("provider")
else:
memory_provider = None
memory_provider = embedder_config.get("provider") if embedder_config else None
if memory_provider == "mem0":
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
@@ -41,7 +37,8 @@ class ShortTermMemory(Memory):
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="short_term", crew=crew)
config = embedder_config.get("config")
storage = Mem0Storage(type="short_term", crew=crew, config=config)
else:
storage = (
storage

View File

@@ -1,10 +1,10 @@
import os
from typing import Any, Dict, List
from collections import defaultdict
from mem0 import Memory, MemoryClient
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.memory.storage.interface import Storage
from crewai.utilities.chromadb import sanitize_collection_name
MAX_AGENT_ID_LENGTH_MEM0 = 255
@@ -13,47 +13,159 @@ class Mem0Storage(Storage):
"""
Extends Storage to handle embedding and searching across entities using Mem0.
"""
def __init__(self, type, crew=None, config=None):
super().__init__()
supported_types = ["user", "short_term", "long_term", "entities", "external"]
if type not in supported_types:
raise ValueError(
f"Invalid type '{type}' for Mem0Storage. Must be one of: "
+ ", ".join(supported_types)
)
self._validate_type(type)
self.memory_type = type
self.crew = crew
self.config = config or {}
# TODO: Memory config will be removed in the future the config will be passed as a parameter
self.memory_config = self.config or getattr(crew, "memory_config", {}) or {}
# User ID is required for user memory type "user" since it's used as a unique identifier for the user.
user_id = self._get_user_id()
if type == "user" and not user_id:
raise ValueError("User ID is required for user memory type")
self._extract_config_values()
self._initialize_memory()
# API key in memory config overrides the environment variable
config = self._get_config()
mem0_api_key = config.get("api_key") or os.getenv("MEM0_API_KEY")
mem0_org_id = config.get("org_id")
mem0_project_id = config.get("project_id")
mem0_local_config = config.get("local_mem0_config")
def _validate_type(self, type):
supported_types = {"short_term", "long_term", "entities", "external"}
if type not in supported_types:
raise ValueError(
f"Invalid type '{type}' for Mem0Storage. Must be one of: {', '.join(supported_types)}"
)
# Initialize MemoryClient or Memory based on the presence of the mem0_api_key
if mem0_api_key:
if mem0_org_id and mem0_project_id:
self.memory = MemoryClient(
api_key=mem0_api_key, org_id=mem0_org_id, project_id=mem0_project_id
)
else:
self.memory = MemoryClient(api_key=mem0_api_key)
def _extract_config_values(self):
self.mem0_run_id = self.config.get("run_id")
self.includes = self.config.get("includes")
self.excludes = self.config.get("excludes")
self.custom_categories = self.config.get("custom_categories")
self.infer = self.config.get("infer", True)
def _initialize_memory(self):
api_key = self.config.get("api_key") or os.getenv("MEM0_API_KEY")
org_id = self.config.get("org_id")
project_id = self.config.get("project_id")
local_config = self.config.get("local_mem0_config")
if api_key:
self.memory = (
MemoryClient(api_key=api_key, org_id=org_id, project_id=project_id)
if org_id and project_id
else MemoryClient(api_key=api_key)
)
if self.custom_categories:
self.memory.update_project(custom_categories=self.custom_categories)
else:
if mem0_local_config and len(mem0_local_config):
self.memory = Memory.from_config(mem0_local_config)
else:
self.memory = Memory()
self.memory = (
Memory.from_config(local_config)
if local_config and len(local_config)
else Memory()
)
def _create_filter_for_search(self):
"""
Returns:
dict: A filter dictionary containing AND conditions for querying data.
- Includes user_id and agent_id if both are present.
- Includes user_id if only user_id is present.
- Includes agent_id if only agent_id is present.
- Includes run_id if memory_type is 'short_term' and mem0_run_id is present.
"""
filter = defaultdict(list)
if self.memory_type == "short_term" and self.mem0_run_id:
filter["AND"].append({"run_id": self.mem0_run_id})
else:
user_id = self.config.get("user_id", "")
agent_id = self.config.get("agent_id", "")
if user_id and agent_id:
filter["OR"].append({"user_id": user_id})
filter["OR"].append({"agent_id": agent_id})
elif user_id:
filter["AND"].append({"user_id": user_id})
elif agent_id:
filter["AND"].append({"agent_id": agent_id})
return filter
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
user_id = self.config.get("user_id", "")
assistant_message = [{"role" : "assistant","content" : value}]
base_metadata = {
"short_term": "short_term",
"long_term": "long_term",
"entities": "entity",
"external": "external"
}
# Shared base params
params: dict[str, Any] = {
"metadata": {"type": base_metadata[self.memory_type], **metadata},
"infer": self.infer
}
# MemoryClient-specific overrides
if isinstance(self.memory, MemoryClient):
params["includes"] = self.includes
params["excludes"] = self.excludes
params["output_format"] = "v1.1"
params["version"] = "v2"
if self.memory_type == "short_term" and self.mem0_run_id:
params["run_id"] = self.mem0_run_id
if user_id:
params["user_id"] = user_id
if agent_id := self.config.get("agent_id", self._get_agent_name()):
params["agent_id"] = agent_id
self.memory.add(assistant_message, **params)
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> List[Any]:
params = {
"query": query,
"limit": limit,
"version": "v2",
"output_format": "v1.1"
}
if user_id := self.config.get("user_id", ""):
params["user_id"] = user_id
memory_type_map = {
"short_term": {"type": "short_term"},
"long_term": {"type": "long_term"},
"entities": {"type": "entity"},
"external": {"type": "external"},
}
if self.memory_type in memory_type_map:
params["metadata"] = memory_type_map[self.memory_type]
if self.memory_type == "short_term":
params["run_id"] = self.mem0_run_id
# Discard the filters for now since we create the filters
# automatically when the crew is created.
params["filters"] = self._create_filter_for_search()
params['threshold'] = score_threshold
if isinstance(self.memory, Memory):
del params["metadata"], params["version"], params['output_format']
if params.get("run_id"):
del params["run_id"]
results = self.memory.search(**params)
# This makes it compatible for Contextual Memory to retrieve
for result in results["results"]:
result["context"] = result["memory"]
return [r for r in results["results"]]
def reset(self):
if self.memory:
self.memory.reset()
def _sanitize_role(self, role: str) -> str:
"""
@@ -61,75 +173,6 @@ class Mem0Storage(Storage):
"""
return role.replace("\n", "").replace(" ", "_").replace("/", "_")
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
user_id = self._get_user_id()
agent_name = self._get_agent_name()
params = None
if self.memory_type == "short_term":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "short_term", **metadata},
}
elif self.memory_type == "long_term":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "long_term", **metadata},
}
elif self.memory_type == "entities":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "entity", **metadata},
}
elif self.memory_type == "external":
params = {
"user_id": user_id,
"agent_id": agent_name,
"metadata": {"type": "external", **metadata},
}
if params:
if isinstance(self.memory, MemoryClient):
params["output_format"] = "v1.1"
self.memory.add(value, **params)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
params = {"query": query, "limit": limit, "output_format": "v1.1"}
if user_id := self._get_user_id():
params["user_id"] = user_id
agent_name = self._get_agent_name()
if self.memory_type == "short_term":
params["agent_id"] = agent_name
params["metadata"] = {"type": "short_term"}
elif self.memory_type == "long_term":
params["agent_id"] = agent_name
params["metadata"] = {"type": "long_term"}
elif self.memory_type == "entities":
params["agent_id"] = agent_name
params["metadata"] = {"type": "entity"}
elif self.memory_type == "external":
params["agent_id"] = agent_name
params["metadata"] = {"type": "external"}
# Discard the filters for now since we create the filters
# automatically when the crew is created.
if isinstance(self.memory, Memory):
del params["metadata"], params["output_format"]
results = self.memory.search(**params)
return [r for r in results["results"] if r["score"] >= score_threshold]
def _get_user_id(self) -> str:
return self._get_config().get("user_id", "")
def _get_agent_name(self) -> str:
if not self.crew:
return ""
@@ -137,11 +180,4 @@ class Mem0Storage(Storage):
agents = self.crew.agents
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return sanitize_collection_name(name=agents,max_collection_length=MAX_AGENT_ID_LENGTH_MEM0)
def _get_config(self) -> Dict[str, Any]:
return self.config or getattr(self, "memory_config", {}).get("config", {}) or {}
def reset(self):
if self.memory:
self.memory.reset()
return sanitize_collection_name(name=agents, max_collection_length=MAX_AGENT_ID_LENGTH_MEM0)

View File

@@ -4,12 +4,12 @@ import logging
import os
import shutil
import uuid
from typing import Any, Dict, List, Optional
from chromadb.api import ClientAPI
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.rag.storage.base_rag_storage import BaseRAGStorage
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.chromadb import create_persistent_client
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path
@@ -60,17 +60,15 @@ class RAGStorage(BaseRAGStorage):
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
self.app = create_persistent_client(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
self.collection = self.app.get_or_create_collection(
name=self.type, embedding_function=self.embedder_config
)

View File

@@ -1,58 +0,0 @@
import warnings
from typing import Any, Dict, Optional
from crewai.memory.memory import Memory
class UserMemory(Memory):
"""
UserMemory class for handling user memory storage and retrieval.
Inherits from the Memory class and utilizes an instance of a class that
adheres to the Storage for data storage, specifically working with
MemoryItem instances.
"""
def __init__(self, crew=None):
warnings.warn(
"UserMemory is deprecated and will be removed in a future version. "
"Please use ExternalMemory instead.",
DeprecationWarning,
stacklevel=2,
)
try:
from crewai.memory.storage.mem0_storage import Mem0Storage
except ImportError:
raise ImportError(
"Mem0 is not installed. Please install it with `pip install mem0ai`."
)
storage = Mem0Storage(type="user", crew=crew)
super().__init__(storage)
def save(
self,
value,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
# TODO: Change this function since we want to take care of the case where we save memories for the usr
data = f"Remember the details about the user: {value}"
super().save(data, metadata)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
):
results = self.storage.search(
query=query,
limit=limit,
score_threshold=score_threshold,
)
return results
def reset(self) -> None:
try:
self.storage.reset()
except Exception as e:
raise Exception(f"An error occurred while resetting the user memory: {e}")

View File

@@ -1,8 +0,0 @@
from typing import Any, Dict, Optional
class UserMemoryItem:
def __init__(self, data: Any, user: str, metadata: Optional[Dict[str, Any]] = None):
self.data = data
self.user = user
self.metadata = metadata if metadata is not None else {}

View File

@@ -0,0 +1 @@
"""RAG (Retrieval-Augmented Generation) infrastructure for CrewAI."""

View File

@@ -0,0 +1 @@
"""Embedding components for RAG infrastructure."""

View File

@@ -38,7 +38,14 @@ class EmbeddingConfigurator:
f"Unsupported embedding provider: {provider}, supported providers: {list(self.embedding_functions.keys())}"
)
embedding_function = self.embedding_functions[provider]
try:
embedding_function = self.embedding_functions[provider]
except ImportError as e:
missing_package = str(e).split()[-1]
raise ImportError(
f"{missing_package} is not installed. Please install it with: pip install {missing_package}"
)
return (
embedding_function(config)
if provider == "custom"

View File

@@ -0,0 +1 @@
"""Storage components for RAG infrastructure."""

View File

@@ -67,6 +67,7 @@ class Task(BaseModel):
description: Descriptive text detailing task's purpose and execution.
expected_output: Clear definition of expected task outcome.
output_file: File path for storing task output.
create_directory: Whether to create the directory for output_file if it doesn't exist.
output_json: Pydantic model for structuring JSON output.
output_pydantic: Pydantic model for task output.
security_config: Security configuration including fingerprinting.
@@ -115,6 +116,10 @@ class Task(BaseModel):
description="A file path to be used to create a file output.",
default=None,
)
create_directory: Optional[bool] = Field(
description="Whether to create the directory for output_file if it doesn't exist.",
default=True,
)
output: Optional[TaskOutput] = Field(
description="Task output, it's final result after being executed", default=None
)
@@ -753,8 +758,10 @@ Follow these guidelines:
resolved_path = Path(self.output_file).expanduser().resolve()
directory = resolved_path.parent
if not directory.exists():
if self.create_directory and not directory.exists():
directory.mkdir(parents=True, exist_ok=True)
elif not self.create_directory and not directory.exists():
raise RuntimeError(f"Directory {directory} does not exist and create_directory is False")
with resolved_path.open("w", encoding="utf-8") as file:
if isinstance(result, dict):

View File

@@ -1,10 +1,9 @@
from typing import Any, Optional, Tuple
from typing import Any, Tuple
from pydantic import BaseModel, Field
from crewai.agent import Agent, LiteAgentOutput
from crewai.llm import LLM
from crewai.task import Task
from crewai.llm import BaseLLM
from crewai.tasks.task_output import TaskOutput
@@ -32,11 +31,11 @@ class LLMGuardrail:
def __init__(
self,
description: str,
llm: LLM,
llm: BaseLLM,
):
self.description = description
self.llm: LLM = llm
self.llm: BaseLLM = llm
def _validate_output(self, task_output: TaskOutput) -> LiteAgentOutput:
agent = Agent(

View File

@@ -10,7 +10,6 @@ from .rpm_controller import RPMController
from .exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from .embedding_configurator import EmbeddingConfigurator
__all__ = [
"Converter",
@@ -24,5 +23,4 @@ __all__ = [
"RPMController",
"YamlParser",
"LLMContextLengthExceededException",
"EmbeddingConfigurator",
]

View File

@@ -157,10 +157,6 @@ def get_llm_response(
from_agent=from_agent,
)
except Exception as e:
printer.print(
content=f"Error during LLM call: {e}",
color="red",
)
raise e
if not answer:
printer.print(
@@ -232,12 +228,17 @@ def handle_unknown_error(printer: Any, exception: Exception) -> None:
printer: Printer instance for output
exception: The exception that occurred
"""
error_message = str(exception)
if "litellm" in error_message:
return
printer.print(
content="An unknown error occurred. Please check the details below.",
color="red",
)
printer.print(
content=f"Error details: {exception}",
content=f"Error details: {error_message}",
color="red",
)
@@ -399,7 +400,7 @@ def show_agent_logs(
if not verbose:
return
agent_role = agent_role.split("\n")[0]
agent_role = agent_role.partition("\n")[0]
if formatted_answer is None:
# Start logs

View File

@@ -1,6 +1,10 @@
import re
import portalocker
from chromadb import PersistentClient
from hashlib import md5
from typing import Optional
MIN_COLLECTION_LENGTH = 3
MAX_COLLECTION_LENGTH = 63
DEFAULT_COLLECTION = "default_collection"
@@ -60,3 +64,16 @@ def sanitize_collection_name(name: Optional[str], max_collection_length: int = M
sanitized = sanitized[:-1] + "z"
return sanitized
def create_persistent_client(path: str, **kwargs):
"""
Creates a persistent client for ChromaDB with a lock file to prevent
concurrent creations. Works for both multi-threads and multi-processes
environments.
"""
lockfile = f"chromadb-{md5(path.encode(), usedforsecurity=False).hexdigest()}.lock"
with portalocker.Lock(lockfile):
client = PersistentClient(path=path, **kwargs)
return client

View File

@@ -16,3 +16,4 @@ class _NotSpecified:
# Unlike `None`, which might be a valid value from the user, `NOT_SPECIFIED` allows
# us to distinguish between "not passed at all" and "explicitly passed None" or "[]".
NOT_SPECIFIED = _NotSpecified()
CREWAI_BASE_URL = "https://app.crewai.com/"

View File

@@ -155,6 +155,7 @@ class CrewEvaluator:
)
console = Console()
console.print("\n")
console.print(table)
def evaluate(self, task_output: TaskOutput):

View File

@@ -17,6 +17,9 @@ from .agent_events import (
AgentExecutionStartedEvent,
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentEvaluationStartedEvent,
AgentEvaluationCompletedEvent,
AgentEvaluationFailedEvent,
)
from .task_events import (
TaskStartedEvent,
@@ -74,6 +77,9 @@ __all__ = [
"AgentExecutionStartedEvent",
"AgentExecutionCompletedEvent",
"AgentExecutionErrorEvent",
"AgentEvaluationStartedEvent",
"AgentEvaluationCompletedEvent",
"AgentEvaluationFailedEvent",
"TaskStartedEvent",
"TaskCompletedEvent",
"TaskFailedEvent",

View File

@@ -123,3 +123,28 @@ class AgentLogsExecutionEvent(BaseEvent):
type: str = "agent_logs_execution"
model_config = {"arbitrary_types_allowed": True}
# Agent Eval events
class AgentEvaluationStartedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
type: str = "agent_evaluation_started"
class AgentEvaluationCompletedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
metric_category: Any
score: Any
type: str = "agent_evaluation_completed"
class AgentEvaluationFailedEvent(BaseEvent):
agent_id: str
agent_role: str
task_id: str | None = None
iteration: int
error: str
type: str = "agent_evaluation_failed"

View File

@@ -1,6 +1,5 @@
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
@@ -9,7 +8,7 @@ from crewai.utilities.serialization import to_serializable
class BaseEvent(BaseModel):
"""Base class for all events"""
timestamp: datetime = Field(default_factory=datetime.now)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"

View File

@@ -47,6 +47,7 @@ class CrewKickoffCompletedEvent(CrewBaseEvent):
output: Any
type: str = "crew_kickoff_completed"
total_tokens: int = 0
class CrewKickoffFailedEvent(CrewBaseEvent):

Some files were not shown because too many files have changed in this diff Show More