Compare commits

...

77 Commits

Author SHA1 Message Date
Brandon Hancock
6efee89399 refactor 2025-02-28 12:30:30 -05:00
Brandon Hancock
75d8e086a4 Refactor token tracking: Remove token_cost_process parameter for cleaner code 2025-02-28 12:17:14 -05:00
Brandon Hancock
ef48cbe971 Fix token tracking in Agent class to use token_process instead of _token_process 2025-02-28 11:44:51 -05:00
Brandon Hancock
88d8079dcd Fix token tracking in LangChainAgentAdapter and refactor token_process attribute to be public 2025-02-28 11:31:08 -05:00
Brandon Hancock
33ef612cd5 Merge branch 'main' into brandon/bring-back-byoa 2025-02-28 09:26:34 -05:00
Tony Kipkemboi
86825e1769 docs: add Qdrant vector search tool documentation (#2184)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:54:44 -05:00
Brandon Hancock (bhancock_ai)
7afc531fbb Improve hierarchical docs (#2244) 2025-02-27 13:38:21 -05:00
Brandon Hancock (bhancock_ai)
ed0490112b explain how to use event listener (#2245) 2025-02-27 13:32:16 -05:00
Brandon Hancock (bhancock_ai)
6c81acac00 Update docs (#2226) 2025-02-27 13:20:44 -05:00
Brandon Hancock (bhancock_ai)
2181166a62 Improve extract thought (#2223)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:44 -05:00
Brandon Hancock (bhancock_ai)
3f17789152 Support multiple router calls and address issue #2175 (#2231)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:44 -05:00
Brandon Hancock (bhancock_ai)
ed0b8e1563 Fix type issue (#2224)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:44 -05:00
Brandon Hancock (bhancock_ai)
4915420ed2 Add support for python 3.10 (#2230) 2025-02-27 13:20:44 -05:00
Lorenze Jay
271faa917b feat: Enhance agent knowledge setup with optional crew embedder (#2232)
- Modify `Agent` class to add `set_knowledge` method
- Allow setting embedder from crew-level configuration
- Remove `_set_knowledge` method from initialization
- Update `Crew` class to set agent knowledge during agent setup
- Add default implementation in `BaseAgent` for compatibility
2025-02-27 13:20:42 -05:00
Fernando Galves
e87eb57edc Update the constants.py file adding the list of foundation models available in Amazon Bedrock (#2170)
* Update constants.py

This PR updates the list of foundation models available in Amazon Bedrock to reflect the latest offerings.

* Update constants.py with inference profiles

Add the cross-region inference profiles to increase throughput and improve resiliency by routing your requests across multiple AWS Regions during peak utilization bursts.

* Update constants.py

Fix the model order

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
devin-ai-integration[bot]
e6e4bb15d7 feat: add context window size for o3-mini model (#2192)
* feat: add context window size for o3-mini model

Fixes #2191

Co-Authored-By: Joe Moura <joao@crewai.com>

* feat: add context window validation and tests

- Add validation for context window size bounds (1024-2097152)
- Add test for context window validation
- Fix test import error

Co-Authored-By: Joe Moura <joao@crewai.com>

* style: fix import sorting in llm_test.py

Co-Authored-By: Joe Moura <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
92a3349d64 incorporating fix from @misrasaurabh1 with additional type fix (#2213)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Shivtej Narake
940f0647a9 [MINOR]support ChatOllama from langchain_ollama (#2158)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Vidit Ostwal
5db512bef6 Fixed the issue 2123 around memory command with CLI (#2155)
* Fixed the issue 2123 around memory command with CLI

* Fixed typo, added the recommendations

* Fixed Typo

* Fixed lint issue

* Fixed the print statement to include path as well

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Victor Degliame
48d2b8c320 fix: typo in 'delegate_work' and 'ask_question' promps (#2144)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
nikolaidk
f6b0b492a4 Update kickoff-async.mdx (#2138)
Missing mandatory field expected_output on task in example

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
4b63b29787 Revert "feat: add prompt observability code (#2027)" (#2211)
* Revert "feat: add prompt observability code (#2027)"

This reverts commit 90f1bee602.

* Fix issues with flows post merge

* Decoupling telemetry and ensure tests  (#2212)

* feat: Enhance event listener and telemetry tracking

- Update event listener to improve telemetry span handling
- Add execution_span field to Task for better tracing
- Modify event handling in EventListener to use new span tracking
- Remove debug print statements
- Improve test coverage for crew and flow events
- Update cassettes to reflect new event tracking behavior

* Remove telemetry references from Crew class

- Remove Telemetry import and initialization from Crew class
- Delete _telemetry attribute from class configuration
- Clean up unused telemetry-related code

* test: Improve crew verbose output test with event log filtering

- Filter out event listener logs in verbose output test
- Ensure no output when verbose is set to False
- Enhance test coverage for crew logging behavior

* dropped comment

* refactor: Improve telemetry span tracking in EventListener

- Remove `execution_span` from Task class
- Add `execution_spans` dictionary to EventListener to track spans
- Update task event handlers to use new span tracking mechanism
- Simplify span management across task lifecycle events

* lint

* Fix failing test

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Lorenze Jay
860efc3b42 Decoupling telemetry and ensure tests (#2212)
* feat: Enhance event listener and telemetry tracking

- Update event listener to improve telemetry span handling
- Add execution_span field to Task for better tracing
- Modify event handling in EventListener to use new span tracking
- Remove debug print statements
- Improve test coverage for crew and flow events
- Update cassettes to reflect new event tracking behavior

* Remove telemetry references from Crew class

- Remove Telemetry import and initialization from Crew class
- Delete _telemetry attribute from class configuration
- Clean up unused telemetry-related code

* test: Improve crew verbose output test with event log filtering

- Filter out event listener logs in verbose output test
- Ensure no output when verbose is set to False
- Enhance test coverage for crew logging behavior

* dropped comment

* refactor: Improve telemetry span tracking in EventListener

- Remove `execution_span` from Task class
- Add `execution_spans` dictionary to EventListener to track spans
- Update task event handlers to use new span tracking mechanism
- Simplify span management across task lifecycle events

* lint
2025-02-27 13:20:01 -05:00
Lorenze Jay
70ab4ad003 feat: Add LLM call events for improved observability (#2214)
* feat: Add LLM call events for improved observability

- Introduce new LLM call events: LLMCallStartedEvent, LLMCallCompletedEvent, and LLMCallFailedEvent
- Emit events for LLM calls and tool calls to provide better tracking and debugging
- Add event handling in the LLM class to track call lifecycle
- Update event bus to support new LLM-related events
- Add test cases to validate LLM event emissions

* feat: Add event handling for LLM call lifecycle events

- Implement event listeners for LLM call events in EventListener
- Add logging for LLM call start, completion, and failure events
- Import and register new LLM-specific event types

* less log

* refactor: Update LLM event response type to support Any

* refactor: Simplify LLM call completed event emission

Remove unnecessary LLMCallType conversion when emitting LLMCallCompletedEvent

* refactor: Update LLM event docstrings for clarity

Improve docstrings for LLM call events to more accurately describe their purpose and lifecycle

* feat: Add LLMCallFailedEvent emission for tool execution errors

Enhance error handling by emitting a specific event when tool execution fails during LLM calls
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
6fb25a1af7 fix reset memory issue (#2182)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
ca9277ae4c Better support async flows (#2193)
* Better support async

* Drop coroutine
2025-02-27 13:20:01 -05:00
Jannik Maierhöfer
e58e544304 docs: add header image to langfuse guide (#2128)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
João Moura
84e0a9e686 cassetes 2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
6e0e9b30fe drop prints (#2181) 2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
f6393fd088 Check the right property for tool calling (#2160)
* Check the right property

* Fix failing tests

* Update cassettes

* Update cassettes again

* Update cassettes again 2

* Update cassettes again 3

* fix other test that fails in ci/cd

* Fix issues pointed out by lorenze
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
64804682fc imporve HITL (#2169)
* imporve HITL

* fix failing test

* fix failing test part 2

* Drop extra logs that were causing confusion

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Lorenze Jay
cc7669ab39 WIP crew events emitter (#2048)
* WIP crew events emitter

* Refactor event handling and introduce new event types

- Migrate from global `emit` function to `event_bus.emit`
- Add new event types for task failures, tool usage, and agent execution
- Update event listeners and event bus to support more granular event tracking
- Remove deprecated event emission methods
- Improve event type consistency and add more detailed event information

* Add event emission for agent execution lifecycle

- Emit AgentExecutionStarted and AgentExecutionError events
- Update CrewAgentExecutor to use event_bus for tracking agent execution
- Refactor error handling to include event emission
- Minor code formatting improvements in task.py and crew_agent_executor.py
- Fix a typo in test file

* Refactor event system and add third-party event listeners

- Move event_bus import to correct module paths
- Introduce BaseEventListener abstract base class
- Add AgentOpsListener for third-party event tracking
- Update event listener initialization and setup
- Clean up event-related imports and exports

* Enhance event system type safety and error handling

- Improve type annotations for event bus and event types
- Add null checks for agent and task in event emissions
- Update import paths for base tool and base agent
- Refactor event listener type hints
- Remove unnecessary print statements
- Update test configurations to match new event handling

* Refactor event classes to improve type safety and naming consistency

- Rename event classes to have explicit 'Event' suffix (e.g., TaskStartedEvent)
- Update import statements and references across multiple files
- Remove deprecated events.py module
- Enhance event type hints and configurations
- Clean up unnecessary event-related code

* Add default model for CrewEvaluator and fix event import order

- Set default model to "gpt-4o-mini" in CrewEvaluator when no model is specified
- Reorder event-related imports in task.py to follow standard import conventions
- Update event bus initialization method return type hint
- Export event_bus in events/__init__.py

* Fix tool usage and event import handling

- Update tool usage to use `.get()` method when checking tool name
- Remove unnecessary `__all__` export list in events/__init__.py

* Refactor Flow and Agent event handling to use event_bus

- Remove `event_emitter` from Flow class and replace with `event_bus.emit()`
- Update Flow and Agent tests to use event_bus event listeners
- Remove redundant event emissions in Flow methods
- Add debug print statements in Flow execution
- Simplify event tracking in test cases

* Enhance event handling for Crew, Task, and Event classes

- Add crew name to failed event types (CrewKickoffFailedEvent, CrewTrainFailedEvent, CrewTestFailedEvent)
- Update Task events to remove redundant task and context attributes
- Refactor EventListener to use Logger for consistent event logging
- Add new event types for Crew train and test events
- Improve event bus event tracking in test cases

* Remove telemetry and tracing dependencies from Task and Flow classes

- Remove telemetry-related imports and private attributes from Task class
- Remove `_telemetry` attribute from Flow class
- Update event handling to emit events without direct telemetry tracking
- Simplify task and flow execution by removing explicit telemetry spans
- Move telemetry-related event handling to EventListener

* Clean up unused imports and event-related code

- Remove unused imports from various event and flow-related files
- Reorder event imports to follow standard conventions
- Remove unnecessary event type references
- Simplify import statements in event and flow modules

* Update crew test to validate verbose output and kickoff_for_each method

- Enhance test_crew_verbose_output to check specific listener log messages
- Modify test_kickoff_for_each_invalid_input to use Pydantic validation error
- Improve test coverage for crew logging and input validation

* Update crew test verbose output with improved emoji icons

- Replace task and agent completion icons from 👍 to 
- Enhance readability of test output logging
- Maintain consistent test coverage for crew verbose output

* Add MethodExecutionFailedEvent to handle flow method execution failures

- Introduce new MethodExecutionFailedEvent in flow_events module
- Update Flow class to catch and emit method execution failures
- Add event listener for method execution failure events
- Update event-related imports to include new event type
- Enhance test coverage for method execution failure handling

* Propagate method execution failures in Flow class

- Modify Flow class to re-raise exceptions after emitting MethodExecutionFailedEvent
- Reorder MethodExecutionFailedEvent import to maintain consistent import style

* Enable test coverage for Flow method execution failure event

- Uncomment pytest.raises() in test_events to verify exception handling
- Ensure test validates MethodExecutionFailedEvent emission during flow kickoff

* Add event handling for tool usage events

- Introduce event listeners for ToolUsageFinishedEvent and ToolUsageErrorEvent
- Log tool usage events with descriptive emoji icons ( and )
- Update event_listener to track and log tool usage lifecycle

* Reorder and clean up event imports in event_listener

- Reorganize imports for tool usage events and other event types
- Maintain consistent import ordering and remove unused imports
- Ensure clean and organized import structure in event_listener module

* moving to dedicated eventlistener

* dont forget crew level

* Refactor AgentOps event listener for crew-level tracking

- Modify AgentOpsListener to handle crew-level events
- Initialize and end AgentOps session at crew kickoff and completion
- Create agents for each crew member during session initialization
- Improve session management and event recording
- Clean up and simplify event handling logic

* Update test_events to validate tool usage error event handling

- Modify test to assert single error event with correct attributes
- Use pytest.raises() to verify error event generation
- Simplify error event validation in test case

* Improve AgentOps listener type hints and formatting

- Add string type hints for AgentOps classes to resolve potential import issues
- Clean up unnecessary whitespace and improve code indentation
- Simplify initialization and event handling logic

* Update test_events to validate multiple tool usage events

- Modify test to assert 75 events instead of a single error event
- Remove pytest.raises() check, allowing crew kickoff to complete
- Adjust event validation to support broader event tracking

* Rename event_bus to crewai_event_bus for improved clarity and specificity

- Replace all references to `event_bus` with `crewai_event_bus`
- Update import statements across multiple files
- Remove the old `event_bus.py` file
- Maintain existing event handling functionality

* Enhance EventListener with singleton pattern and color configuration

- Implement singleton pattern for EventListener to ensure single instance
- Add default color configuration using EMITTER_COLOR from constants
- Modify log method calls to use default color and remove redundant color parameters
- Improve initialization logic to prevent multiple initializations

* Add FlowPlotEvent and update event bus to support flow plotting

- Introduce FlowPlotEvent to track flow plotting events
- Replace Telemetry method with event bus emission in Flow.plot()
- Update event bus to support new FlowPlotEvent type
- Add test case to validate flow plotting event emission

* Remove RunType enum and clean up crew events module

- Delete unused RunType enum from crew_events.py
- Simplify crew_events.py by removing unnecessary enum definition
- Improve code clarity by removing unneeded imports

* Enhance event handling for tool usage and agent execution

- Add new events for tool usage: ToolSelectionErrorEvent, ToolValidateInputErrorEvent
- Improve error tracking and event emission in ToolUsage and LLM classes
- Update AgentExecutionStartedEvent to use task_prompt instead of inputs
- Add comprehensive test coverage for new event types and error scenarios

* Refactor event system and improve crew testing

- Extract base CrewEvent class to a new base_events.py module
- Update event imports across multiple event-related files
- Modify CrewTestStartedEvent to use eval_llm instead of openai_model_name
- Add LLM creation validation in crew testing method
- Improve type handling and event consistency

* Refactor task events to use base CrewEvent

- Move CrewEvent import from crew_events to base_events
- Remove unnecessary blank lines in task_events.py
- Simplify event class structure for task-related events

* Update AgentExecutionStartedEvent to use task_prompt

- Modify test_events.py to use task_prompt instead of inputs
- Simplify event input validation in test case
- Align with recent event system refactoring

* Improve type hinting for TaskCompletedEvent handler

- Add explicit type annotation for TaskCompletedEvent in event_listener.py
- Enhance type safety for event handling in EventListener

* Improve test_validate_tool_input_invalid_input with mock objects

- Add explicit mock objects for agent and action in test case
- Ensure proper string values for mock agent and action attributes
- Simplify test setup for ToolUsage validation method

* Remove ToolUsageStartedEvent emission in tool usage process

- Remove unnecessary event emission for tool usage start
- Simplify tool usage event handling
- Eliminate redundant event data preparation step

* refactor: clean up and organize imports in llm and flow modules

* test: Improve flow persistence test cases and logging
2025-02-27 13:20:01 -05:00
João Moura
377b64ac81 making flow verbsoe false by default 2025-02-27 13:20:01 -05:00
Tony Kipkemboi
470254c3e2 docs: update accordions and fix layout (#2110)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Vini Brasil
497190f823 Implement flow.state_utils.to_string method and improve types (#2161) 2025-02-27 13:20:01 -05:00
Eduardo Chiarotti
241adb8ed0 feat: add prompt observability code (#2027)
* feat: add prompt observability code

* feat: improve logic for llm call

* feat: add tests for traces

* feat: remove unused improt

* feat: add function to clear and add task traces

* feat: fix import

* feat:  chagne time

* feat: fix type checking issues

* feat: add fixed time to fix test

* feat: fix datetime test issue

* feat: add add task traces function

* feat: add same logic as entp

* feat: add start_time as reference for duplication of tool call

* feat: add max_depth

* feat: add protocols file to properly import on LLM

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
e60c6e66a4 Bugfix/fix backtick in agent response (#2159)
* updating prompts

* fix issue

* clean up thoughts as well

* drop trailing set
2025-02-27 13:20:01 -05:00
sharmasundip
afd01e3c0c fix user memory config issue (#2086)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Vidit Ostwal
81b8ae0abd Added functionality to have any llm run test functionality (#2071)
* Added functionality to have any llm run test functionality

* Fixed lint issues

* Fixed Linting issues

* Fixed unit test case

* Fixed unit test

* Fixed test case

* Fixed unit test case

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Vini Brasil
ae19437473 Implement Flow state export method (#2134)
This commit implements a method for exporting the state of a flow into a
JSON-serializable dictionary.

The idea is producing a human-readable version of state that can be
inspected or consumed by other systems, hence JSON and not pickling or
marshalling.

I consider it an export because it's a one-way process, meaning it
cannot be loaded back into Python because of complex types.
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
66c66e3d84 Update docs (#2226) 2025-02-26 15:21:36 -05:00
Brandon Hancock (bhancock_ai)
b9b625a70d Improve extract thought (#2223)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-26 14:51:46 -05:00
Brandon Hancock (bhancock_ai)
b58253cacc Support multiple router calls and address issue #2175 (#2231)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-26 13:42:17 -05:00
Brandon Hancock (bhancock_ai)
fbf8732784 Fix type issue (#2224)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-26 13:27:41 -05:00
Brandon Hancock (bhancock_ai)
8fedbe49cb Add support for python 3.10 (#2230) 2025-02-26 13:24:31 -05:00
Lorenze Jay
1e8ee247ca feat: Enhance agent knowledge setup with optional crew embedder (#2232)
- Modify `Agent` class to add `set_knowledge` method
- Allow setting embedder from crew-level configuration
- Remove `_set_knowledge` method from initialization
- Update `Crew` class to set agent knowledge during agent setup
- Add default implementation in `BaseAgent` for compatibility
2025-02-26 12:10:43 -05:00
Fernando Galves
34d2993456 Update the constants.py file adding the list of foundation models available in Amazon Bedrock (#2170)
* Update constants.py

This PR updates the list of foundation models available in Amazon Bedrock to reflect the latest offerings.

* Update constants.py with inference profiles

Add the cross-region inference profiles to increase throughput and improve resiliency by routing your requests across multiple AWS Regions during peak utilization bursts.

* Update constants.py

Fix the model order

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-25 15:39:23 -05:00
devin-ai-integration[bot]
e3c5c174ee feat: add context window size for o3-mini model (#2192)
* feat: add context window size for o3-mini model

Fixes #2191

Co-Authored-By: Joe Moura <joao@crewai.com>

* feat: add context window validation and tests

- Add validation for context window size bounds (1024-2097152)
- Add test for context window validation
- Fix test import error

Co-Authored-By: Joe Moura <joao@crewai.com>

* style: fix import sorting in llm_test.py

Co-Authored-By: Joe Moura <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-25 15:32:14 -05:00
Brandon Hancock (bhancock_ai)
b4e2db0306 incorporating fix from @misrasaurabh1 with additional type fix (#2213)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-25 15:29:21 -05:00
Shivtej Narake
9cc759ba32 [MINOR]support ChatOllama from langchain_ollama (#2158)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-25 15:19:36 -05:00
Vidit Ostwal
ac9f8b9d5a Fixed the issue 2123 around memory command with CLI (#2155)
* Fixed the issue 2123 around memory command with CLI

* Fixed typo, added the recommendations

* Fixed Typo

* Fixed lint issue

* Fixed the print statement to include path as well

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-25 12:29:33 -05:00
Victor Degliame
3d4a1e4b18 fix: typo in 'delegate_work' and 'ask_question' promps (#2144)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-25 12:16:04 -05:00
nikolaidk
123f302744 Update kickoff-async.mdx (#2138)
Missing mandatory field expected_output on task in example

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-25 12:12:27 -05:00
Brandon Hancock (bhancock_ai)
5bae78639e Revert "feat: add prompt observability code (#2027)" (#2211)
* Revert "feat: add prompt observability code (#2027)"

This reverts commit 90f1bee602.

* Fix issues with flows post merge

* Decoupling telemetry and ensure tests  (#2212)

* feat: Enhance event listener and telemetry tracking

- Update event listener to improve telemetry span handling
- Add execution_span field to Task for better tracing
- Modify event handling in EventListener to use new span tracking
- Remove debug print statements
- Improve test coverage for crew and flow events
- Update cassettes to reflect new event tracking behavior

* Remove telemetry references from Crew class

- Remove Telemetry import and initialization from Crew class
- Delete _telemetry attribute from class configuration
- Clean up unused telemetry-related code

* test: Improve crew verbose output test with event log filtering

- Filter out event listener logs in verbose output test
- Ensure no output when verbose is set to False
- Enhance test coverage for crew logging behavior

* dropped comment

* refactor: Improve telemetry span tracking in EventListener

- Remove `execution_span` from Task class
- Add `execution_spans` dictionary to EventListener to track spans
- Update task event handlers to use new span tracking mechanism
- Simplify span management across task lifecycle events

* lint

* Fix failing test

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-24 16:30:16 -05:00
Lorenze Jay
5235442a5b Decoupling telemetry and ensure tests (#2212)
* feat: Enhance event listener and telemetry tracking

- Update event listener to improve telemetry span handling
- Add execution_span field to Task for better tracing
- Modify event handling in EventListener to use new span tracking
- Remove debug print statements
- Improve test coverage for crew and flow events
- Update cassettes to reflect new event tracking behavior

* Remove telemetry references from Crew class

- Remove Telemetry import and initialization from Crew class
- Delete _telemetry attribute from class configuration
- Clean up unused telemetry-related code

* test: Improve crew verbose output test with event log filtering

- Filter out event listener logs in verbose output test
- Ensure no output when verbose is set to False
- Enhance test coverage for crew logging behavior

* dropped comment

* refactor: Improve telemetry span tracking in EventListener

- Remove `execution_span` from Task class
- Add `execution_spans` dictionary to EventListener to track spans
- Update task event handlers to use new span tracking mechanism
- Simplify span management across task lifecycle events

* lint
2025-02-24 12:24:35 -08:00
Lorenze Jay
c62fb615b1 feat: Add LLM call events for improved observability (#2214)
* feat: Add LLM call events for improved observability

- Introduce new LLM call events: LLMCallStartedEvent, LLMCallCompletedEvent, and LLMCallFailedEvent
- Emit events for LLM calls and tool calls to provide better tracking and debugging
- Add event handling in the LLM class to track call lifecycle
- Update event bus to support new LLM-related events
- Add test cases to validate LLM event emissions

* feat: Add event handling for LLM call lifecycle events

- Implement event listeners for LLM call events in EventListener
- Add logging for LLM call start, completion, and failure events
- Import and register new LLM-specific event types

* less log

* refactor: Update LLM event response type to support Any

* refactor: Simplify LLM call completed event emission

Remove unnecessary LLMCallType conversion when emitting LLMCallCompletedEvent

* refactor: Update LLM event docstrings for clarity

Improve docstrings for LLM call events to more accurately describe their purpose and lifecycle

* feat: Add LLMCallFailedEvent emission for tool execution errors

Enhance error handling by emitting a specific event when tool execution fails during LLM calls
2025-02-24 15:17:44 -05:00
Brandon Hancock (bhancock_ai)
78797c64b0 fix reset memory issue (#2182)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-24 14:51:58 -05:00
Brandon Hancock (bhancock_ai)
8a7584798b Better support async flows (#2193)
* Better support async

* Drop coroutine
2025-02-24 10:25:30 -05:00
Jannik Maierhöfer
b50772a38b docs: add header image to langfuse guide (#2128)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-21 10:11:55 -05:00
João Moura
96a7e8038f cassetes 2025-02-20 21:00:10 -06:00
Brandon Hancock (bhancock_ai)
ec050e5d33 drop prints (#2181) 2025-02-20 12:35:39 -05:00
Brandon Hancock (bhancock_ai)
e2ce65fc5b Check the right property for tool calling (#2160)
* Check the right property

* Fix failing tests

* Update cassettes

* Update cassettes again

* Update cassettes again 2

* Update cassettes again 3

* fix other test that fails in ci/cd

* Fix issues pointed out by lorenze
2025-02-20 12:12:52 -05:00
Brandon Hancock (bhancock_ai)
f8c74b4fbb Merge branch 'main' into brandon/bring-back-byoa 2025-02-14 15:57:16 -05:00
Brandon Hancock
134e7ab241 clean up RPM controller 2025-02-14 15:32:34 -05:00
Brandon Hancock
74e63621a5 Fix new errors 2025-02-14 15:29:20 -05:00
Brandon Hancock
8953af6133 Fix ruff issues 2025-02-14 15:24:29 -05:00
Brandon Hancock
2b438baad4 Fix issues 2025-02-14 15:11:58 -05:00
Brandon Hancock
185556b7e3 WIP 2025-02-14 11:40:23 -05:00
Brandon Hancock
1e23d37a14 more fixes 2025-02-13 15:11:51 -05:00
Brandon Hancock
df21f01441 more fixes 2025-02-13 15:08:29 -05:00
Brandon Hancock
b957fc1a18 More type fixues 2025-02-13 15:06:16 -05:00
Brandon Hancock
fd0e1bdd1a Fix more type issues 2025-02-13 15:03:17 -05:00
Brandon Hancock
265b37316b fix type issues 2025-02-13 15:00:27 -05:00
Brandon Hancock
ff32880a54 clean up 2025-02-13 14:55:11 -05:00
Brandon Hancock
a38483e1b4 It works! 2025-02-12 17:28:35 -05:00
Brandon Hancock
7910dc9337 wip 2025-02-12 11:10:00 -05:00
Brandon Hancock
796e50aba8 WIP 2025-02-10 16:11:29 -05:00
56 changed files with 4917 additions and 3213 deletions

3
.gitignore vendored
View File

@@ -21,4 +21,5 @@ crew_tasks_output.json
.mypy_cache
.ruff_cache
.venv
agentops.log
agentops.log
test_flow.html

View File

@@ -136,17 +136,21 @@ crewai test -n 5 -m gpt-3.5-turbo
### 8. Run
Run the crew.
Run the crew or flow.
```shell Terminal
crewai run
```
<Note>
Starting from version 0.103.0, the `crewai run` command can be used to run both standard crews and flows. For flows, it automatically detects the type from pyproject.toml and runs the appropriate command. This is now the recommended way to run both crews and flows.
</Note>
<Note>
Make sure to run these commands from the directory where your CrewAI project is set up.
Some commands may require additional configuration or setup within your project structure.
</Note>
### 9. Chat
Starting in version `0.98.0`, when you run the `crewai chat` command, you start an interactive session with your crew. The AI assistant will guide you by asking for necessary inputs to execute the crew. Once all inputs are provided, the crew will execute its tasks.
@@ -175,7 +179,6 @@ def crew(self) -> Crew:
```
</Note>
### 10. API Keys
When running ```crewai create crew``` command, the CLI will first show you the top 5 most common LLM providers and ask you to select one.

View File

@@ -0,0 +1,349 @@
---
title: 'Event Listeners'
description: 'Tap into CrewAI events to build custom integrations and monitoring'
---
# Event Listeners
CrewAI provides a powerful event system that allows you to listen for and react to various events that occur during the execution of your Crew. This feature enables you to build custom integrations, monitoring solutions, logging systems, or any other functionality that needs to be triggered based on CrewAI's internal events.
## How It Works
CrewAI uses an event bus architecture to emit events throughout the execution lifecycle. The event system is built on the following components:
1. **CrewAIEventsBus**: A singleton event bus that manages event registration and emission
2. **CrewEvent**: Base class for all events in the system
3. **BaseEventListener**: Abstract base class for creating custom event listeners
When specific actions occur in CrewAI (like a Crew starting execution, an Agent completing a task, or a tool being used), the system emits corresponding events. You can register handlers for these events to execute custom code when they occur.
## Creating a Custom Event Listener
To create a custom event listener, you need to:
1. Create a class that inherits from `BaseEventListener`
2. Implement the `setup_listeners` method
3. Register handlers for the events you're interested in
4. Create an instance of your listener in the appropriate file
Here's a simple example of a custom event listener class:
```python
from crewai.utilities.events import (
CrewKickoffStartedEvent,
CrewKickoffCompletedEvent,
AgentExecutionCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class MyCustomListener(BaseEventListener):
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
print(f"Crew '{event.crew_name}' has started execution!")
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
print(f"Crew '{event.crew_name}' has completed execution!")
print(f"Output: {event.output}")
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event):
print(f"Agent '{event.agent.role}' completed task")
print(f"Output: {event.output}")
```
## Properly Registering Your Listener
Simply defining your listener class isn't enough. You need to create an instance of it and ensure it's imported in your application. This ensures that:
1. The event handlers are registered with the event bus
2. The listener instance remains in memory (not garbage collected)
3. The listener is active when events are emitted
### Option 1: Import and Instantiate in Your Crew or Flow Implementation
The most important thing is to create an instance of your listener in the file where your Crew or Flow is defined and executed:
#### For Crew-based Applications
Create and import your listener at the top of your Crew implementation file:
```python
# In your crew.py file
from crewai import Agent, Crew, Task
from my_listeners import MyCustomListener
# Create an instance of your listener
my_listener = MyCustomListener()
class MyCustomCrew:
# Your crew implementation...
def crew(self):
return Crew(
agents=[...],
tasks=[...],
# ...
)
```
#### For Flow-based Applications
Create and import your listener at the top of your Flow implementation file:
```python
# In your main.py or flow.py file
from crewai.flow import Flow, listen, start
from my_listeners import MyCustomListener
# Create an instance of your listener
my_listener = MyCustomListener()
class MyCustomFlow(Flow):
# Your flow implementation...
@start()
def first_step(self):
# ...
```
This ensures that your listener is loaded and active when your Crew or Flow is executed.
### Option 2: Create a Package for Your Listeners
For a more structured approach, especially if you have multiple listeners:
1. Create a package for your listeners:
```
my_project/
├── listeners/
│ ├── __init__.py
│ ├── my_custom_listener.py
│ └── another_listener.py
```
2. In `my_custom_listener.py`, define your listener class and create an instance:
```python
# my_custom_listener.py
from crewai.utilities.events.base_event_listener import BaseEventListener
# ... import events ...
class MyCustomListener(BaseEventListener):
# ... implementation ...
# Create an instance of your listener
my_custom_listener = MyCustomListener()
```
3. In `__init__.py`, import the listener instances to ensure they're loaded:
```python
# __init__.py
from .my_custom_listener import my_custom_listener
from .another_listener import another_listener
# Optionally export them if you need to access them elsewhere
__all__ = ['my_custom_listener', 'another_listener']
```
4. Import your listeners package in your Crew or Flow file:
```python
# In your crew.py or flow.py file
import my_project.listeners # This loads all your listeners
class MyCustomCrew:
# Your crew implementation...
```
This is exactly how CrewAI's built-in `agentops_listener` is registered. In the CrewAI codebase, you'll find:
```python
# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Available Event Types
CrewAI provides a wide range of events that you can listen for:
### Crew Events
- **CrewKickoffStartedEvent**: Emitted when a Crew starts execution
- **CrewKickoffCompletedEvent**: Emitted when a Crew completes execution
- **CrewKickoffFailedEvent**: Emitted when a Crew fails to complete execution
- **CrewTestStartedEvent**: Emitted when a Crew starts testing
- **CrewTestCompletedEvent**: Emitted when a Crew completes testing
- **CrewTestFailedEvent**: Emitted when a Crew fails to complete testing
- **CrewTrainStartedEvent**: Emitted when a Crew starts training
- **CrewTrainCompletedEvent**: Emitted when a Crew completes training
- **CrewTrainFailedEvent**: Emitted when a Crew fails to complete training
### Agent Events
- **AgentExecutionStartedEvent**: Emitted when an Agent starts executing a task
- **AgentExecutionCompletedEvent**: Emitted when an Agent completes executing a task
- **AgentExecutionErrorEvent**: Emitted when an Agent encounters an error during execution
### Task Events
- **TaskStartedEvent**: Emitted when a Task starts execution
- **TaskCompletedEvent**: Emitted when a Task completes execution
- **TaskFailedEvent**: Emitted when a Task fails to complete execution
- **TaskEvaluationEvent**: Emitted when a Task is evaluated
### Tool Usage Events
- **ToolUsageStartedEvent**: Emitted when a tool execution is started
- **ToolUsageFinishedEvent**: Emitted when a tool execution is completed
- **ToolUsageErrorEvent**: Emitted when a tool execution encounters an error
- **ToolValidateInputErrorEvent**: Emitted when a tool input validation encounters an error
- **ToolExecutionErrorEvent**: Emitted when a tool execution encounters an error
- **ToolSelectionErrorEvent**: Emitted when there's an error selecting a tool
### Flow Events
- **FlowCreatedEvent**: Emitted when a Flow is created
- **FlowStartedEvent**: Emitted when a Flow starts execution
- **FlowFinishedEvent**: Emitted when a Flow completes execution
- **FlowPlotEvent**: Emitted when a Flow is plotted
- **MethodExecutionStartedEvent**: Emitted when a Flow method starts execution
- **MethodExecutionFinishedEvent**: Emitted when a Flow method completes execution
- **MethodExecutionFailedEvent**: Emitted when a Flow method fails to complete execution
### LLM Events
- **LLMCallStartedEvent**: Emitted when an LLM call starts
- **LLMCallCompletedEvent**: Emitted when an LLM call completes
- **LLMCallFailedEvent**: Emitted when an LLM call fails
## Event Handler Structure
Each event handler receives two parameters:
1. **source**: The object that emitted the event
2. **event**: The event instance, containing event-specific data
The structure of the event object depends on the event type, but all events inherit from `CrewEvent` and include:
- **timestamp**: The time when the event was emitted
- **type**: A string identifier for the event type
Additional fields vary by event type. For example, `CrewKickoffCompletedEvent` includes `crew_name` and `output` fields.
## Real-World Example: Integration with AgentOps
CrewAI includes an example of a third-party integration with [AgentOps](https://github.com/AgentOps-AI/agentops), a monitoring and observability platform for AI agents. Here's how it's implemented:
```python
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
```
This listener initializes an AgentOps session when a Crew starts, registers agents with AgentOps, tracks tool usage, and ends the session when the Crew completes.
The AgentOps listener is registered in CrewAI's event system through the import in `src/crewai/utilities/events/third_party/__init__.py`:
```python
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Advanced Usage: Scoped Handlers
For temporary event handling (useful for testing or specific operations), you can use the `scoped_handlers` context manager:
```python
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)
def temp_handler(source, event):
print("This handler only exists within this context")
# Do something that emits events
# Outside the context, the temporary handler is removed
```
## Use Cases
Event listeners can be used for a variety of purposes:
1. **Logging and Monitoring**: Track the execution of your Crew and log important events
2. **Analytics**: Collect data about your Crew's performance and behavior
3. **Debugging**: Set up temporary listeners to debug specific issues
4. **Integration**: Connect CrewAI with external systems like monitoring platforms, databases, or notification services
5. **Custom Behavior**: Trigger custom actions based on specific events
## Best Practices
1. **Keep Handlers Light**: Event handlers should be lightweight and avoid blocking operations
2. **Error Handling**: Include proper error handling in your event handlers to prevent exceptions from affecting the main execution
3. **Cleanup**: If your listener allocates resources, ensure they're properly cleaned up
4. **Selective Listening**: Only listen for events you actually need to handle
5. **Testing**: Test your event listeners in isolation to ensure they behave as expected
By leveraging CrewAI's event system, you can extend its functionality and integrate it seamlessly with your existing infrastructure.

View File

@@ -150,12 +150,12 @@ final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
````
```
```text Output
---- Final Output ----
Second method received: Output from first_method
````
```
</CodeGroup>
@@ -738,3 +738,34 @@ Also, check out our YouTube video on how to use flows in CrewAI below!
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
></iframe>
## Running Flows
There are two ways to run a flow:
### Using the Flow API
You can run a flow programmatically by creating an instance of your flow class and calling the `kickoff()` method:
```python
flow = ExampleFlow()
result = flow.kickoff()
```
### Using the CLI
Starting from version 0.103.0, you can run flows using the `crewai run` command:
```shell
crewai run
```
This command automatically detects if your project is a flow (based on the `type = "flow"` setting in your pyproject.toml) and runs it accordingly. This is the recommended way to run flows from the command line.
For backward compatibility, you can also use:
```shell
crewai flow kickoff
```
However, the `crewai run` command is now the preferred method as it works for both crews and flows.

View File

@@ -506,7 +506,7 @@ my_crew = Crew(
)
```
### Resetting Memory
### Resetting Memory via cli
```shell
crewai reset-memories [OPTIONS]
@@ -520,8 +520,46 @@ crewai reset-memories [OPTIONS]
| `-s`, `--short` | Reset SHORT TERM memory. | Flag (boolean) | False |
| `-e`, `--entities` | Reset ENTITIES memory. | Flag (boolean) | False |
| `-k`, `--kickoff-outputs` | Reset LATEST KICKOFF TASK OUTPUTS. | Flag (boolean) | False |
| `-kn`, `--knowledge` | Reset KNOWLEDEGE storage | Flag (boolean) | False |
| `-a`, `--all` | Reset ALL memories. | Flag (boolean) | False |
Note: To use the cli command you need to have your crew in a file called crew.py in the same directory.
### Resetting Memory via crew object
```python
my_crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential,
memory=True,
verbose=True,
embedder={
"provider": "custom",
"config": {
"embedder": CustomEmbedder()
}
}
)
my_crew.reset_memories(command_type = 'all') # Resets all the memory
```
#### Resetting Memory Options
| Command Type | Description |
| :----------------- | :------------------------------- |
| `long` | Reset LONG TERM memory. |
| `short` | Reset SHORT TERM memory. |
| `entities` | Reset ENTITIES memory. |
| `kickoff_outputs` | Reset LATEST KICKOFF TASK OUTPUTS. |
| `knowledge` | Reset KNOWLEDGE memory. |
| `all` | Reset ALL memories. |
## Benefits of Using CrewAI's Memory System

View File

@@ -48,7 +48,6 @@ Define a crew with a designated manager and establish a clear chain of command.
</Tip>
```python Code
from langchain_openai import ChatOpenAI
from crewai import Crew, Process, Agent
# Agents are defined with attributes for backstory, cache, and verbose mode
@@ -56,38 +55,51 @@ researcher = Agent(
role='Researcher',
goal='Conduct in-depth analysis',
backstory='Experienced data analyst with a knack for uncovering hidden trends.',
cache=True,
verbose=False,
# tools=[] # This can be optionally specified; defaults to an empty list
use_system_prompt=True, # Enable or disable system prompts for this agent
max_rpm=30, # Limit on the number of requests per minute
max_iter=5 # Maximum number of iterations for a final answer
)
writer = Agent(
role='Writer',
goal='Create engaging content',
backstory='Creative writer passionate about storytelling in technical domains.',
cache=True,
verbose=False,
# tools=[] # Optionally specify tools; defaults to an empty list
use_system_prompt=True, # Enable or disable system prompts for this agent
max_rpm=30, # Limit on the number of requests per minute
max_iter=5 # Maximum number of iterations for a final answer
)
# Establishing the crew with a hierarchical process and additional configurations
project_crew = Crew(
tasks=[...], # Tasks to be delegated and executed under the manager's supervision
agents=[researcher, writer],
manager_llm=ChatOpenAI(temperature=0, model="gpt-4"), # Mandatory if manager_agent is not set
process=Process.hierarchical, # Specifies the hierarchical management approach
respect_context_window=True, # Enable respect of the context window for tasks
memory=True, # Enable memory usage for enhanced task execution
manager_agent=None, # Optional: explicitly set a specific agent as manager instead of the manager_llm
planning=True, # Enable planning feature for pre-execution strategy
manager_llm="gpt-4o", # Specify which LLM the manager should use
process=Process.hierarchical,
planning=True,
)
```
### Using a Custom Manager Agent
Alternatively, you can create a custom manager agent with specific attributes tailored to your project's management needs. This gives you more control over the manager's behavior and capabilities.
```python
# Define a custom manager agent
manager = Agent(
role="Project Manager",
goal="Efficiently manage the crew and ensure high-quality task completion",
backstory="You're an experienced project manager, skilled in overseeing complex projects and guiding teams to success.",
allow_delegation=True,
)
# Use the custom manager in your crew
project_crew = Crew(
tasks=[...],
agents=[researcher, writer],
manager_agent=manager, # Use your custom manager agent
process=Process.hierarchical,
planning=True,
)
```
<Tip>
For more details on creating and customizing a manager agent, check out the [Custom Manager Agent documentation](https://docs.crewai.com/how-to/custom-manager-agent#custom-manager-agent).
</Tip>
### Workflow in Action
1. **Task Assignment**: The manager assigns tasks strategically, considering each agent's capabilities and available tools.
@@ -97,4 +109,4 @@ project_crew = Crew(
## Conclusion
Adopting the hierarchical process in CrewAI, with the correct configurations and understanding of the system's capabilities, facilitates an organized and efficient approach to project management.
Utilize the advanced features and customizations to tailor the workflow to your specific needs, ensuring optimal task execution and project success.
Utilize the advanced features and customizations to tailor the workflow to your specific needs, ensuring optimal task execution and project success.

View File

@@ -54,7 +54,8 @@ coding_agent = Agent(
# Create a task that requires code execution
data_analysis_task = Task(
description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent
agent=coding_agent,
expected_output="The average age of the participants."
)
# Create a crew and add the task
@@ -116,4 +117,4 @@ async def async_multiple_crews():
# Run the async function
asyncio.run(async_multiple_crews())
```
```

View File

@@ -10,6 +10,8 @@ This notebook demonstrates how to integrate **Langfuse** with **CrewAI** using O
> **What is Langfuse?** [Langfuse](https://langfuse.com) is an open-source LLM engineering platform. It provides tracing and monitoring capabilities for LLM applications, helping developers debug, analyze, and optimize their AI systems. Langfuse integrates with various tools and frameworks via native integrations, OpenTelemetry, and APIs/SDKs.
[![Langfuse Overview Video](https://github.com/user-attachments/assets/3926b288-ff61-4b95-8aa1-45d041c70866)](https://langfuse.com/watch-demo)
## Get Started
We'll walk through a simple example of using CrewAI and integrating it with Langfuse via OpenTelemetry using OpenLit.

View File

@@ -139,6 +139,7 @@
"tools/nl2sqltool",
"tools/pdfsearchtool",
"tools/pgsearchtool",
"tools/qdrantvectorsearchtool",
"tools/scrapewebsitetool",
"tools/seleniumscrapingtool",
"tools/spidertool",

View File

@@ -0,0 +1,271 @@
---
title: 'Qdrant Vector Search Tool'
description: 'Semantic search capabilities for CrewAI agents using Qdrant vector database'
icon: magnifying-glass-plus
---
# `QdrantVectorSearchTool`
The Qdrant Vector Search Tool enables semantic search capabilities in your CrewAI agents by leveraging [Qdrant](https://qdrant.tech/), a vector similarity search engine. This tool allows your agents to search through documents stored in a Qdrant collection using semantic similarity.
## Installation
Install the required packages:
```bash
uv pip install 'crewai[tools] qdrant-client'
```
## Basic Usage
Here's a minimal example of how to use the tool:
```python
from crewai import Agent
from crewai_tools import QdrantVectorSearchTool
# Initialize the tool
qdrant_tool = QdrantVectorSearchTool(
qdrant_url="your_qdrant_url",
qdrant_api_key="your_qdrant_api_key",
collection_name="your_collection"
)
# Create an agent that uses the tool
agent = Agent(
role="Research Assistant",
goal="Find relevant information in documents",
tools=[qdrant_tool]
)
# The tool will automatically use OpenAI embeddings
# and return the 3 most relevant results with scores > 0.35
```
## Complete Working Example
Here's a complete example showing how to:
1. Extract text from a PDF
2. Generate embeddings using OpenAI
3. Store in Qdrant
4. Create a CrewAI agentic RAG workflow for semantic search
```python
import os
import uuid
import pdfplumber
from openai import OpenAI
from dotenv import load_dotenv
from crewai import Agent, Task, Crew, Process, LLM
from crewai_tools import QdrantVectorSearchTool
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
# Load environment variables
load_dotenv()
# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Extract text from PDF
def extract_text_from_pdf(pdf_path):
text = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text.append(page_text.strip())
return text
# Generate OpenAI embeddings
def get_openai_embedding(text):
response = client.embeddings.create(
input=text,
model="text-embedding-3-small"
)
return response.data[0].embedding
# Store text and embeddings in Qdrant
def load_pdf_to_qdrant(pdf_path, qdrant, collection_name):
# Extract text from PDF
text_chunks = extract_text_from_pdf(pdf_path)
# Create Qdrant collection
if qdrant.collection_exists(collection_name):
qdrant.delete_collection(collection_name)
qdrant.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
)
# Store embeddings
points = []
for chunk in text_chunks:
embedding = get_openai_embedding(chunk)
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={"text": chunk}
))
qdrant.upsert(collection_name=collection_name, points=points)
# Initialize Qdrant client and load data
qdrant = QdrantClient(
url=os.getenv("QDRANT_URL"),
api_key=os.getenv("QDRANT_API_KEY")
)
collection_name = "example_collection"
pdf_path = "path/to/your/document.pdf"
load_pdf_to_qdrant(pdf_path, qdrant, collection_name)
# Initialize Qdrant search tool
qdrant_tool = QdrantVectorSearchTool(
qdrant_url=os.getenv("QDRANT_URL"),
qdrant_api_key=os.getenv("QDRANT_API_KEY"),
collection_name=collection_name,
limit=3,
score_threshold=0.35
)
# Create CrewAI agents
search_agent = Agent(
role="Senior Semantic Search Agent",
goal="Find and analyze documents based on semantic search",
backstory="""You are an expert research assistant who can find relevant
information using semantic search in a Qdrant database.""",
tools=[qdrant_tool],
verbose=True
)
answer_agent = Agent(
role="Senior Answer Assistant",
goal="Generate answers to questions based on the context provided",
backstory="""You are an expert answer assistant who can generate
answers to questions based on the context provided.""",
tools=[qdrant_tool],
verbose=True
)
# Define tasks
search_task = Task(
description="""Search for relevant documents about the {query}.
Your final answer should include:
- The relevant information found
- The similarity scores of the results
- The metadata of the relevant documents""",
agent=search_agent
)
answer_task = Task(
description="""Given the context and metadata of relevant documents,
generate a final answer based on the context.""",
agent=answer_agent
)
# Run CrewAI workflow
crew = Crew(
agents=[search_agent, answer_agent],
tasks=[search_task, answer_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff(
inputs={"query": "What is the role of X in the document?"}
)
print(result)
```
## Tool Parameters
### Required Parameters
- `qdrant_url` (str): The URL of your Qdrant server
- `qdrant_api_key` (str): API key for authentication with Qdrant
- `collection_name` (str): Name of the Qdrant collection to search
### Optional Parameters
- `limit` (int): Maximum number of results to return (default: 3)
- `score_threshold` (float): Minimum similarity score threshold (default: 0.35)
- `custom_embedding_fn` (Callable[[str], list[float]]): Custom function for text vectorization
## Search Parameters
The tool accepts these parameters in its schema:
- `query` (str): The search query to find similar documents
- `filter_by` (str, optional): Metadata field to filter on
- `filter_value` (str, optional): Value to filter by
## Return Format
The tool returns results in JSON format:
```json
[
{
"metadata": {
// Any metadata stored with the document
},
"context": "The actual text content of the document",
"distance": 0.95 // Similarity score
}
]
```
## Default Embedding
By default, the tool uses OpenAI's `text-embedding-3-small` model for vectorization. This requires:
- OpenAI API key set in environment: `OPENAI_API_KEY`
## Custom Embeddings
Instead of using the default embedding model, you might want to use your own embedding function in cases where you:
1. Want to use a different embedding model (e.g., Cohere, HuggingFace, Ollama models)
2. Need to reduce costs by using open-source embedding models
3. Have specific requirements for vector dimensions or embedding quality
4. Want to use domain-specific embeddings (e.g., for medical or legal text)
Here's an example using a HuggingFace model:
```python
from transformers import AutoTokenizer, AutoModel
import torch
# Load model and tokenizer
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
def custom_embeddings(text: str) -> list[float]:
# Tokenize and get model outputs
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
outputs = model(**inputs)
# Use mean pooling to get text embedding
embeddings = outputs.last_hidden_state.mean(dim=1)
# Convert to list of floats and return
return embeddings[0].tolist()
# Use custom embeddings with the tool
tool = QdrantVectorSearchTool(
qdrant_url="your_url",
qdrant_api_key="your_key",
collection_name="your_collection",
custom_embedding_fn=custom_embeddings # Pass your custom function
)
```
## Error Handling
The tool handles these specific errors:
- Raises ImportError if `qdrant-client` is not installed (with option to auto-install)
- Raises ValueError if `QDRANT_URL` is not set
- Prompts to install `qdrant-client` if missing using `uv add qdrant-client`
## Environment Variables
Required environment variables:
```bash
export QDRANT_URL="your_qdrant_url" # If not provided in constructor
export QDRANT_API_KEY="your_api_key" # If not provided in constructor
export OPENAI_API_KEY="your_openai_key" # If using default embeddings

View File

@@ -1,7 +1,7 @@
import re
import shutil
import subprocess
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from typing import Any, Dict, List, Literal, Optional, Sequence, Union, cast
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -114,7 +114,6 @@ class Agent(BaseAgent):
@model_validator(mode="after")
def post_init_setup(self):
self._set_knowledge()
self.agent_ops_agent_name = self.role
self.llm = create_llm(self.llm)
@@ -134,8 +133,11 @@ class Agent(BaseAgent):
self.cache_handler = CacheHandler()
self.set_cache_handler(self.cache_handler)
def _set_knowledge(self):
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
try:
if self.embedder is None and crew_embedder:
self.embedder = crew_embedder
if self.knowledge_sources:
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"
@@ -168,27 +170,19 @@ class Agent(BaseAgent):
Output of the agent
"""
if self.tools_handler:
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalli
task_prompt = task.prompt()
# If the task requires output in JSON or Pydantic format,
# append specific instructions to the task prompt to ensure
# that the final answer does not include any code block markers
if task.output_json or task.output_pydantic:
# Generate the schema based on the output format
if task.output_json:
# schema = json.dumps(task.output_json, indent=2)
schema = generate_model_description(task.output_json)
task_prompt += "\n" + self.i18n.slice(
"formatted_task_instructions"
).format(output_format=schema)
elif task.output_pydantic:
schema = generate_model_description(task.output_pydantic)
task_prompt += "\n" + self.i18n.slice(
"formatted_task_instructions"
).format(output_format=schema)
# Choose the output format, preferring output_json if available
output_format = (
task.output_json if task.output_json else task.output_pydantic
)
schema = generate_model_description(cast(type, output_format))
task_prompt += f"\n{self.i18n.slice('formatted_task_instructions').format(output_format=schema)}"
if context:
task_prompt = self.i18n.slice("task_with_context").format(
@@ -274,9 +268,6 @@ class Agent(BaseAgent):
raise e
result = self.execute_task(task, context, tools)
if self.max_rpm and self._rpm_controller:
self._rpm_controller.stop_rpm_counter()
# If there was any tool in self.tools_results that had result_as_answer
# set to True, return the results of the last tool that had
# result_as_answer set to True
@@ -336,7 +327,7 @@ class Agent(BaseAgent):
request_within_rpm_limit=(
self._rpm_controller.check_or_wait if self._rpm_controller else None
),
callbacks=[TokenCalcHandler(self._token_process)],
callbacks=[TokenCalcHandler(self.token_process)],
)
def get_delegation_tools(self, agents: List[BaseAgent]):

View File

@@ -73,20 +73,27 @@ class BaseAgent(ABC, BaseModel):
Increment formatting errors.
copy() -> "BaseAgent":
Create a copy of the agent.
set_rpm_controller(rpm_controller: RPMController) -> None:
set_rpm_controller(rpm_controller: Optional[RPMController] = None) -> None:
Set the rpm controller for the agent.
set_private_attrs() -> "BaseAgent":
Set private attributes.
configure_executor(cache_handler: CacheHandler, rpm_controller: RPMController) -> None:
Configure the agent's executor with both cache and RPM handling.
"""
__hash__ = object.__hash__ # type: ignore
model_config = {
"arbitrary_types_allowed": True,
}
_logger: Logger = PrivateAttr(default_factory=lambda: Logger(verbose=False))
_rpm_controller: Optional[RPMController] = PrivateAttr(default=None)
_request_within_rpm_limit: Any = PrivateAttr(default=None)
_original_role: Optional[str] = PrivateAttr(default=None)
_original_goal: Optional[str] = PrivateAttr(default=None)
_original_backstory: Optional[str] = PrivateAttr(default=None)
_token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess)
token_process: TokenProcess = Field(default_factory=TokenProcess, exclude=True)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
formatting_errors: int = Field(
default=0, description="Number of formatting errors."
@@ -196,8 +203,6 @@ class BaseAgent(ABC, BaseModel):
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
if not self._token_process:
self._token_process = TokenProcess()
return self
@@ -217,8 +222,7 @@ class BaseAgent(ABC, BaseModel):
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
if not self._token_process:
self._token_process = TokenProcess()
return self
@property
@@ -266,7 +270,7 @@ class BaseAgent(ABC, BaseModel):
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"token_process",
"agent_executor",
"tools",
"tools_handler",
@@ -337,17 +341,49 @@ class BaseAgent(ABC, BaseModel):
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
self.create_agent_executor()
# Only create the executor if it hasn't been created yet.
if self.agent_executor is None:
self.create_agent_executor()
def increment_formatting_errors(self) -> None:
self.formatting_errors += 1
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
Args:
rpm_controller: An instance of the RPMController class.
def set_rpm_controller(
self, rpm_controller: Optional[RPMController] = None
) -> None:
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()
Set the RPM controller for the agent. If no rpm_controller is provided, then:
- use self.max_rpm if set, or
- if self.crew exists and has max_rpm, use that.
"""
if self._rpm_controller is None:
if rpm_controller is not None:
self._rpm_controller = rpm_controller
elif self.max_rpm:
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
elif self.crew and getattr(self.crew, "max_rpm", None):
self._rpm_controller = RPMController(
max_rpm=self.crew.max_rpm, logger=self._logger
)
# else: no rpm limit provided leave the controller None
if self.agent_executor is None:
self.create_agent_executor()
def configure_executor(
self, cache_handler: CacheHandler, rpm_controller: Optional[RPMController]
) -> None:
"""Configure the agent's executor with both cache and RPM handling.
This method delegates to set_cache_handler and set_rpm_controller, applying the configuration
only if the respective flags or values are set.
"""
if self.cache:
self.set_cache_handler(cache_handler)
# Use the injected RPM controller rather than auto-creating one
if rpm_controller:
self.set_rpm_controller(rpm_controller)
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
pass

View File

@@ -31,11 +31,11 @@ class OutputConverter(BaseModel, ABC):
)
@abstractmethod
def to_pydantic(self, current_attempt=1):
def to_pydantic(self, current_attempt=1) -> BaseModel:
"""Convert text to pydantic."""
pass
@abstractmethod
def to_json(self, current_attempt=1):
def to_json(self, current_attempt=1) -> dict:
"""Convert text to json."""
pass

View File

@@ -88,7 +88,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
tool.name: tool for tool in self.tools
}
self.stop = stop_words
self.llm.stop = list(set(self.llm.stop + self.stop))
self.llm.stop = list(set((self.llm.stop or []) + self.stop))
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
if "system" in self.prompt:

View File

@@ -0,0 +1,468 @@
from typing import Any, List, Optional, Type, Union, cast
from pydantic import Field, field_validator
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.base_tool import Tool
from crewai.utilities.converter import Converter, generate_model_description
from crewai.utilities.token_counter_callback import (
LangChainTokenCounter,
LiteLLMTokenCounter,
)
class LangChainAgentAdapter(BaseAgent):
"""
Adapter class to wrap a LangChain agent and make it compatible with CrewAI's BaseAgent interface.
Note:
- This adapter does not require LangChain as a dependency.
- It wraps an external LangChain agent (passed as any type) and delegates calls
such as execute_task() to the LangChain agent's invoke() method.
- Extended logic is added to build prompts, incorporate memory, knowledge, training hints,
and now a human feedback loop similar to what is done in CrewAgentExecutor.
"""
langchain_agent: Any = Field(
...,
description="The wrapped LangChain runnable agent instance. It is expected to have an 'invoke' method.",
)
tools: Optional[List[Union[BaseTool, Any]]] = Field(
default_factory=list,
description="Tools at the agent's disposal. Accepts both CrewAI BaseTool instances and other tools.",
)
function_calling_llm: Optional[Any] = Field(
default=None, description="Optional function calling LLM."
)
step_callback: Optional[Any] = Field(
default=None,
description="Callback executed after each step of agent execution.",
)
allow_code_execution: Optional[bool] = Field(
default=False, description="Enable code execution for the agent."
)
multimodal: bool = Field(
default=False, description="Whether the agent is multimodal."
)
i18n: Any = None
crew: Any = None
knowledge: Any = None
token_process: TokenProcess = Field(default_factory=TokenProcess, exclude=True)
token_callback: Optional[Any] = None
class Config:
arbitrary_types_allowed = True
@field_validator("tools", mode="before")
def convert_tools(cls, value):
"""Ensure tools are valid CrewAI BaseTool instances."""
if not value:
return value
new_tools = []
for tool in value:
# If tool is already a CrewAI BaseTool instance, keep it as is.
if isinstance(tool, BaseTool):
new_tools.append(tool)
else:
new_tools.append(Tool.from_langchain(tool))
return new_tools
def _extract_text(self, message: Any) -> str:
"""
Helper to extract plain text from a message object.
This checks if the message is a dict with a "content" key, or has a "content" attribute,
or if it's a tuple from LangGraph's message format.
"""
# Handle LangGraph message tuple format (role, content)
if isinstance(message, tuple) and len(message) == 2:
return str(message[1])
# Handle dictionary with content key
elif isinstance(message, dict):
if "content" in message:
return message["content"]
# Handle LangGraph message format with additional metadata
elif "messages" in message and message["messages"]:
last_message = message["messages"][-1]
if isinstance(last_message, tuple) and len(last_message) == 2:
return str(last_message[1])
return self._extract_text(last_message)
# Handle object with content attribute
elif hasattr(message, "content") and isinstance(
getattr(message, "content"), str
):
return getattr(message, "content")
# Handle string directly
elif isinstance(message, str):
return message
# Default fallback
return str(message)
def _register_token_callback(self):
"""
Register the appropriate token counter callback with the language model.
This method handles different types of models (LiteLLM, LangChain, direct LLMs)
and different callback structures.
"""
# Skip if we already have a token callback registered
if self.token_callback is not None:
return
# Skip if we don't have a token_process attribute
if not hasattr(self, "token_process"):
return
# Determine if we're using LiteLLM or LangChain based on the agent type
if hasattr(self.langchain_agent, "client") and hasattr(
self.langchain_agent.client, "callbacks"
):
# This is likely a LiteLLM-based agent
self.token_callback = LiteLLMTokenCounter(self.token_process)
# Add our callback to the LLM directly
if isinstance(self.langchain_agent.client.callbacks, list):
if self.token_callback not in self.langchain_agent.client.callbacks:
self.langchain_agent.client.callbacks.append(self.token_callback)
else:
self.langchain_agent.client.callbacks = [self.token_callback]
else:
# This is likely a LangChain-based agent
self.token_callback = LangChainTokenCounter(self.token_process)
# Add callback to the LangChain model
if hasattr(self.langchain_agent, "callbacks"):
if self.langchain_agent.callbacks is None:
self.langchain_agent.callbacks = [self.token_callback]
elif isinstance(self.langchain_agent.callbacks, list):
self.langchain_agent.callbacks.append(self.token_callback)
# For direct LLM models
elif hasattr(self.langchain_agent, "llm") and hasattr(
self.langchain_agent.llm, "callbacks"
):
if self.langchain_agent.llm.callbacks is None:
self.langchain_agent.llm.callbacks = [self.token_callback]
elif isinstance(self.langchain_agent.llm.callbacks, list):
self.langchain_agent.llm.callbacks.append(self.token_callback)
# Direct LLM case
elif not hasattr(self.langchain_agent, "agent"):
# This might be a direct LLM, not an agent
if (
not hasattr(self.langchain_agent, "callbacks")
or self.langchain_agent.callbacks is None
):
self.langchain_agent.callbacks = [self.token_callback]
elif isinstance(self.langchain_agent.callbacks, list):
self.langchain_agent.callbacks.append(self.token_callback)
def execute_task(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""
Execute a task by building the full task prompt (with memory, knowledge, tool instructions,
and training hints) then delegating execution to the wrapped LangChain agent.
If the task requires human input, a feedback loop is run that mimics the CrewAgentExecutor.
"""
task_prompt = task.prompt()
if task.output_json or task.output_pydantic:
# Choose the output format, preferring output_json if available
output_format = (
task.output_json if task.output_json else task.output_pydantic
)
schema = generate_model_description(cast(type, output_format))
instruction = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
task_prompt += f"\n{instruction}"
if context:
task_prompt = self.i18n.slice("task_with_context").format(
task=task_prompt, context=context
)
if self.crew and self.crew.memory:
from crewai.memory.contextual.contextual_memory import ContextualMemory
contextual_memory = ContextualMemory(
self.crew.memory_config,
self.crew._short_term_memory,
self.crew._long_term_memory,
self.crew._entity_memory,
self.crew._user_memory,
)
memory = contextual_memory.build_context_for_task(task, context)
if memory.strip():
task_prompt += self.i18n.slice("memory").format(memory=memory)
if self.knowledge:
agent_knowledge_snippets = self.knowledge.query([task.prompt()])
if agent_knowledge_snippets:
from crewai.knowledge.utils.knowledge_utils import (
extract_knowledge_context,
)
agent_knowledge_context = extract_knowledge_context(
agent_knowledge_snippets
)
if agent_knowledge_context:
task_prompt += agent_knowledge_context
if self.crew:
knowledge_snippets = self.crew.query_knowledge([task.prompt()])
if knowledge_snippets:
from crewai.knowledge.utils.knowledge_utils import (
extract_knowledge_context,
)
crew_knowledge_context = extract_knowledge_context(knowledge_snippets)
if crew_knowledge_context:
task_prompt += crew_knowledge_context
tools = tools or self.tools or []
self.create_agent_executor(tools=tools)
self._show_start_logs(task)
if self.crew and getattr(self.crew, "_train", False):
task_prompt = self._training_handler(task_prompt=task_prompt)
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
# Register token tracking callback
self._register_token_callback()
init_state = {"messages": [("user", task_prompt)]}
# Estimate input tokens for tracking
if hasattr(self, "token_process"):
# Rough estimate based on characters (better than word count)
estimated_prompt_tokens = len(task_prompt) // 4 # ~4 chars per token
self.token_process.sum_prompt_tokens(estimated_prompt_tokens)
state = self.agent_executor.invoke(init_state)
# Extract output from state based on its structure
if "structured_response" in state:
current_output = state["structured_response"]
elif "messages" in state and state["messages"]:
last_message = state["messages"][-1]
current_output = self._extract_text(last_message)
elif "output" in state:
current_output = str(state["output"])
else:
# Fallback to extracting text from the entire state
current_output = self._extract_text(state)
# Estimate completion tokens for tracking if we don't have actual counts
if hasattr(self, "token_process"):
# Rough estimate based on characters
estimated_completion_tokens = len(current_output) // 4 # ~4 chars per token
self.token_process.sum_completion_tokens(estimated_completion_tokens)
self.token_process.sum_successful_requests(1)
if task.human_input:
current_output = self._handle_human_feedback(current_output)
return current_output
def _handle_human_feedback(self, current_output: str) -> str:
"""
Implements a feedback loop that prompts the user for feedback and then instructs
the underlying LangChain agent to regenerate its answer with the requested changes.
Only the inner content of the output is displayed to the user.
"""
while True:
print("\nAgent output:")
# Print only the inner text extracted from current_output.
print(self._extract_text(current_output))
feedback = input("\nEnter your feedback (or press Enter to accept): ")
if not feedback.strip():
break # No feedback provided, exit the loop
extracted_output = self._extract_text(current_output)
new_prompt = (
f"Below is your previous answer:\n"
f"{extracted_output}\n\n"
f"Based on the following feedback: '{feedback}', please regenerate your answer with the requested details. "
f"Specifically, display 10 bullet points in each section. Provide the complete updated answer below.\n\n"
f"Updated answer:"
)
# Estimate input tokens for tracking
if hasattr(self, "token_process"):
# Rough estimate based on characters
estimated_prompt_tokens = len(new_prompt) // 4 # ~4 chars per token
self.token_process.sum_prompt_tokens(estimated_prompt_tokens)
try:
new_state = self.agent_executor.invoke(
{"messages": [("user", new_prompt)]}
)
# Extract output from state based on its structure
if "structured_response" in new_state:
new_output = new_state["structured_response"]
elif "messages" in new_state and new_state["messages"]:
last_message = new_state["messages"][-1]
new_output = self._extract_text(last_message)
elif "output" in new_state:
new_output = str(new_state["output"])
else:
# Fallback to extracting text from the entire state
new_output = self._extract_text(new_state)
# Estimate completion tokens for tracking
if hasattr(self, "token_process"):
# Rough estimate based on characters
estimated_completion_tokens = (
len(new_output) // 4
) # ~4 chars per token
self.token_process.sum_completion_tokens(
estimated_completion_tokens
)
self.token_process.sum_successful_requests(1)
current_output = new_output
except Exception as e:
print("Error during re-invocation with feedback:", e)
break
return current_output
def _generate_model_description(self, model: Any) -> str:
"""
Generates a string description (schema) for the expected output.
This is a placeholder that should call the actual implementation.
"""
from crewai.utilities.converter import generate_model_description
return generate_model_description(model)
def _training_handler(self, task_prompt: str) -> str:
"""
Append training instructions from Crew data to the task prompt.
"""
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.training_handler import CrewTrainingHandler
data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
if data:
agent_id = str(self.id)
if data.get(agent_id):
human_feedbacks = [
i["human_feedback"] for i in data.get(agent_id, {}).values()
]
task_prompt += (
"\n\nYou MUST follow these instructions: \n "
+ "\n - ".join(human_feedbacks)
)
return task_prompt
def _use_trained_data(self, task_prompt: str) -> str:
"""
Append pre-trained instructions from Crew data to the task prompt.
"""
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE
from crewai.utilities.training_handler import CrewTrainingHandler
data = CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).load()
if data and (trained_data_output := data.get(getattr(self, "role", "default"))):
task_prompt += (
"\n\nYou MUST follow these instructions: \n - "
+ "\n - ".join(trained_data_output["suggestions"])
)
return task_prompt
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
"""
Creates an agent executor using LangGraph's create_react_agent if given an LLM,
or uses the provided language model directly.
"""
try:
from langgraph.prebuilt import create_react_agent
except ImportError as e:
raise ImportError(
"LangGraph library not found. Please run `uv add langgraph` to add LangGraph support."
) from e
# Ensure raw_tools is always a list.
raw_tools: List[Any] = (
tools
if tools is not None
else (self.tools if self.tools is not None else [])
)
# Fallback: if raw_tools is still empty, try to extract them from the wrapped LangChain agent.
if not raw_tools:
if hasattr(self.langchain_agent, "agent") and hasattr(
self.langchain_agent.agent, "tools"
):
raw_tools = self.langchain_agent.agent.tools or []
else:
raw_tools = getattr(self.langchain_agent, "tools", []) or []
used_tools = []
# Use the global CrewAI Tool class (imported at the module level)
for tool in raw_tools:
# If the tool is a CrewAI Tool, convert it to a LangChain compatible tool.
if isinstance(tool, Tool):
used_tools.append(tool.to_langchain())
else:
used_tools.append(tool)
# Sanitize the agent's role for the "name" field. The allowed pattern is ^[a-zA-Z0-9_-]+$
import re
agent_role = getattr(self, "role", "agent")
sanitized_role = re.sub(r"\s+", "_", agent_role)
# Register token tracking callback
self._register_token_callback()
self.agent_executor = create_react_agent(
model=self.langchain_agent,
tools=used_tools,
debug=getattr(self, "verbose", False),
name=sanitized_role,
)
def _parse_tools(self, tools: List[BaseTool]) -> List[BaseTool]:
return tools
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]:
return []
def get_output_converter(
self,
llm: Any,
text: str,
model: Optional[Type] = None,
instructions: str = "",
) -> Converter:
return Converter(llm=llm, text=text, model=model, instructions=instructions)
def _show_start_logs(self, task: Task) -> None:
if self.langchain_agent is None:
raise ValueError("Agent cannot be None")
# Check if the adapter or its crew is in verbose mode.
verbose = self.verbose or (self.crew and getattr(self.crew, "verbose", False))
if verbose:
from crewai.utilities import Printer
printer = Printer()
# Use the adapter's role (inherited from BaseAgent) for logging.
printer.print(
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{self.role}\033[00m"
)
description = getattr(task, "description", "Not Found")
printer.print(
content=f"\033[95m## Task:\033[00m \033[92m{description}\033[00m"
)

View File

@@ -124,14 +124,15 @@ class CrewAgentParser:
)
def _extract_thought(self, text: str) -> str:
regex = r"(.*?)(?:\n\nAction|\n\nFinal Answer)"
thought_match = re.search(regex, text, re.DOTALL)
if thought_match:
thought = thought_match.group(1).strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
return ""
thought_index = text.find("\n\nAction")
if thought_index == -1:
thought_index = text.find("\n\nFinal Answer")
if thought_index == -1:
return ""
thought = text[:thought_index].strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""

View File

@@ -203,7 +203,6 @@ def install(context):
@crewai.command()
def run():
"""Run the Crew."""
click.echo("Running the Crew")
run_crew()

View File

@@ -216,10 +216,43 @@ MODELS = {
"watsonx/ibm/granite-3-8b-instruct",
],
"bedrock": [
"bedrock/us.amazon.nova-pro-v1:0",
"bedrock/us.amazon.nova-micro-v1:0",
"bedrock/us.amazon.nova-lite-v1:0",
"bedrock/us.anthropic.claude-3-5-sonnet-20240620-v1:0",
"bedrock/us.anthropic.claude-3-5-haiku-20241022-v1:0",
"bedrock/us.anthropic.claude-3-5-sonnet-20241022-v2:0",
"bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
"bedrock/us.anthropic.claude-3-sonnet-20240229-v1:0",
"bedrock/us.anthropic.claude-3-opus-20240229-v1:0",
"bedrock/us.anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/us.meta.llama3-2-11b-instruct-v1:0",
"bedrock/us.meta.llama3-2-3b-instruct-v1:0",
"bedrock/us.meta.llama3-2-90b-instruct-v1:0",
"bedrock/us.meta.llama3-2-1b-instruct-v1:0",
"bedrock/us.meta.llama3-1-8b-instruct-v1:0",
"bedrock/us.meta.llama3-1-70b-instruct-v1:0",
"bedrock/us.meta.llama3-3-70b-instruct-v1:0",
"bedrock/us.meta.llama3-1-405b-instruct-v1:0",
"bedrock/eu.anthropic.claude-3-5-sonnet-20240620-v1:0",
"bedrock/eu.anthropic.claude-3-sonnet-20240229-v1:0",
"bedrock/eu.anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/eu.meta.llama3-2-3b-instruct-v1:0",
"bedrock/eu.meta.llama3-2-1b-instruct-v1:0",
"bedrock/apac.anthropic.claude-3-5-sonnet-20240620-v1:0",
"bedrock/apac.anthropic.claude-3-5-sonnet-20241022-v2:0",
"bedrock/apac.anthropic.claude-3-sonnet-20240229-v1:0",
"bedrock/apac.anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/amazon.nova-pro-v1:0",
"bedrock/amazon.nova-micro-v1:0",
"bedrock/amazon.nova-lite-v1:0",
"bedrock/anthropic.claude-3-5-sonnet-20240620-v1:0",
"bedrock/anthropic.claude-3-5-haiku-20241022-v1:0",
"bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
"bedrock/anthropic.claude-3-7-sonnet-20250219-v1:0",
"bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
"bedrock/anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/anthropic.claude-3-opus-20240229-v1:0",
"bedrock/anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/anthropic.claude-v2:1",
"bedrock/anthropic.claude-v2",
"bedrock/anthropic.claude-instant-v1",
@@ -234,8 +267,6 @@ MODELS = {
"bedrock/ai21.j2-mid-v1",
"bedrock/ai21.j2-ultra-v1",
"bedrock/ai21.jamba-instruct-v1:0",
"bedrock/meta.llama2-13b-chat-v1",
"bedrock/meta.llama2-70b-chat-v1",
"bedrock/mistral.mistral-7b-instruct-v0:2",
"bedrock/mistral.mixtral-8x7b-instruct-v0:1",
],

View File

@@ -1,4 +1,6 @@
import subprocess
from enum import Enum
from typing import List, Optional
import click
from packaging import version
@@ -7,16 +9,24 @@ from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
class CrewType(Enum):
STANDARD = "standard"
FLOW = "flow"
def run_crew() -> None:
"""
Run the crew by running a command in the UV environment.
Run the crew or flow by running a command in the UV environment.
Starting from version 0.103.0, this command can be used to run both
standard crews and flows. For flows, it detects the type from pyproject.toml
and automatically runs the appropriate command.
"""
command = ["uv", "run", "run_crew"]
crewai_version = get_crewai_version()
min_required_version = "0.71.0"
pyproject_data = read_toml()
# Check for legacy poetry configuration
if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version)
):
@@ -26,18 +36,54 @@ def run_crew() -> None:
fg="red",
)
# Determine crew type
is_flow = pyproject_data.get("tool", {}).get("crewai", {}).get("type") == "flow"
crew_type = CrewType.FLOW if is_flow else CrewType.STANDARD
# Display appropriate message
click.echo(f"Running the {'Flow' if is_flow else 'Crew'}")
# Execute the appropriate command
execute_command(crew_type)
def execute_command(crew_type: CrewType) -> None:
"""
Execute the appropriate command based on crew type.
Args:
crew_type: The type of crew to run
"""
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]
try:
subprocess.run(command, capture_output=False, text=True, check=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the crew: {e}", err=True)
click.echo(e.output, err=True, nl=True)
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry, please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)
handle_error(e, crew_type)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
def handle_error(error: subprocess.CalledProcessError, crew_type: CrewType) -> None:
"""
Handle subprocess errors with appropriate messaging.
Args:
error: The subprocess error that occurred
crew_type: The type of crew that was being run
"""
entity_type = "flow" if crew_type == CrewType.FLOW else "crew"
click.echo(f"An error occurred while running the {entity_type}: {error}", err=True)
if error.output:
click.echo(error.output, err=True, nl=True)
pyproject_data = read_toml()
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry, "
"please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)

View File

@@ -30,13 +30,13 @@ crewai install
## Running the Project
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
To kickstart your flow and begin execution, run this from the root folder of your project:
```bash
crewai run
```
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration.
This command initializes the {{name}} Flow as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.

View File

@@ -257,11 +257,11 @@ def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
import os
for root, _, files in os.walk("."):
if "crew.py" in files:
crew_path = os.path.join(root, "crew.py")
if crew_path in files:
crew_os_path = os.path.join(root, crew_path)
try:
spec = importlib.util.spec_from_file_location(
"crew_module", crew_path
"crew_module", crew_os_path
)
if not spec or not spec.loader:
continue
@@ -273,9 +273,11 @@ def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
for attr_name in dir(module):
attr = getattr(module, attr_name)
try:
if callable(attr) and hasattr(attr, "crew"):
crew_instance = attr().crew()
return crew_instance
if isinstance(attr, Crew) and hasattr(attr, "kickoff"):
print(
f"Found valid crew object in attribute '{attr_name}' at {crew_os_path}."
)
return attr
except Exception as e:
print(f"Error processing attribute {attr_name}: {e}")

View File

@@ -35,10 +35,8 @@ from crewai.process import Process
from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.traces.unified_trace_controller import init_crew_main_trace
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -96,7 +94,7 @@ class Crew(BaseModel):
__hash__ = object.__hash__ # type: ignore
_execution_span: Any = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr()
_rpm_controller: Optional[RPMController] = PrivateAttr()
_logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
@@ -250,7 +248,6 @@ class Crew(BaseModel):
@model_validator(mode="after")
def set_private_attrs(self) -> "Crew":
"""Set private attributes."""
self._cache_handler = CacheHandler()
self._logger = Logger(verbose=self.verbose)
if self.output_log_file:
self._file_handler = FileHandler(self.output_log_file)
@@ -258,8 +255,24 @@ class Crew(BaseModel):
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
return self
@model_validator(mode="after")
def initialize_dependencies(self) -> "Crew":
# Always create a cache handler, but it will only be used if self.cache is True
# Create the Crew-level RPM controller if a max RPM is specified
if self.max_rpm is not None:
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=Logger(verbose=self.verbose)
)
else:
self._rpm_controller = None
# Now inject these external dependencies into each agent
for agent in self.agents:
agent.crew = self # ensure the agent's crew reference is set
agent.configure_executor(self._cache_handler, self._rpm_controller)
return self
@model_validator(mode="after")
@@ -361,10 +374,7 @@ class Crew(BaseModel):
if self.agents:
for agent in self.agents:
if self.cache:
agent.set_cache_handler(self._cache_handler)
if self.max_rpm:
agent.set_rpm_controller(self._rpm_controller)
agent.configure_executor(self._cache_handler, self._rpm_controller)
return self
@model_validator(mode="after")
@@ -574,7 +584,6 @@ class Crew(BaseModel):
CrewTrainingHandler(filename).clear()
raise
@init_crew_main_trace
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = None,
@@ -605,6 +614,7 @@ class Crew(BaseModel):
agent.i18n = i18n
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.crew = self # type: ignore[attr-defined]
agent.set_knowledge(crew_embedder=self.embedder)
# TODO: Create an AgentFunctionCalling protocol for future refactoring
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
@@ -631,7 +641,7 @@ class Crew(BaseModel):
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
metrics += [agent._token_process.get_summary() for agent in self.agents]
metrics += [agent.token_process.get_summary() for agent in self.agents]
self.usage_metrics = UsageMetrics()
for metric in metrics:
@@ -1115,7 +1125,6 @@ class Crew(BaseModel):
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_telemetry",
"agents",
"tasks",
"knowledge_sources",
@@ -1179,19 +1188,22 @@ class Crew(BaseModel):
agent.interpolate_inputs(inputs)
def _finish_execution(self, final_string_output: str) -> None:
if self.max_rpm:
if self._rpm_controller:
self._rpm_controller.stop_rpm_counter()
def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""
total_usage_metrics = UsageMetrics()
for agent in self.agents:
if hasattr(agent, "_token_process"):
token_sum = agent._token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
token_sum = self.manager_agent._token_process.get_summary()
# Directly access token_process since it's now a field in BaseAgent
token_sum = agent.token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
if self.manager_agent:
# Directly access token_process since it's now a field in BaseAgent
token_sum = self.manager_agent.token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
self.usage_metrics = total_usage_metrics
return total_usage_metrics
@@ -1278,11 +1290,11 @@ class Crew(BaseModel):
def _reset_all_memories(self) -> None:
"""Reset all available memory systems."""
memory_systems = [
("short term", self._short_term_memory),
("entity", self._entity_memory),
("long term", self._long_term_memory),
("task output", self._task_output_handler),
("knowledge", self.knowledge),
("short term", getattr(self, "_short_term_memory", None)),
("entity", getattr(self, "_entity_memory", None)),
("long term", getattr(self, "_long_term_memory", None)),
("task output", getattr(self, "_task_output_handler", None)),
("knowledge", getattr(self, "knowledge", None)),
]
for name, system in memory_systems:

View File

@@ -22,10 +22,6 @@ from pydantic import BaseModel, Field, ValidationError
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.utils import get_possible_return_constants
from crewai.traces.unified_trace_controller import (
init_flow_main_trace,
trace_flow_step,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.flow_events import (
FlowCreatedEvent,
@@ -713,16 +709,34 @@ class Flow(Generic[T], metaclass=FlowMeta):
raise TypeError(f"State must be dict or BaseModel, got {type(self._state)}")
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""Start the flow execution.
"""
Start the flow execution in a synchronous context.
This method wraps kickoff_async so that all state initialization and event
emission is handled in the asynchronous method.
"""
async def run_flow():
return await self.kickoff_async(inputs)
return asyncio.run(run_flow())
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Start the flow execution asynchronously.
This method performs state restoration (if an 'id' is provided and persistence is available)
and updates the flow state with any additional inputs. It then emits the FlowStartedEvent,
logs the flow startup, and executes all start methods. Once completed, it emits the
FlowFinishedEvent and returns the final output.
Args:
inputs: Optional dictionary containing input values and potentially a state ID to restore
"""
# Handle state restoration if ID is provided in inputs
if inputs and "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
inputs: Optional dictionary containing input values and/or a state ID for restoration.
Returns:
The final output from the flow, which is the result of the last executed method.
"""
if inputs:
# Override the id in the state if it exists in inputs
if "id" in inputs:
if isinstance(self._state, dict):
@@ -730,24 +744,27 @@ class Flow(Generic[T], metaclass=FlowMeta):
elif isinstance(self._state, BaseModel):
setattr(self._state, "id", inputs["id"])
if stored_state:
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
# Restore the state
self._restore_state(stored_state)
else:
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)
# If persistence is enabled, attempt to restore the stored state using the provided id.
if "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
if stored_state:
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
self._restore_state(stored_state)
else:
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)
# Apply any additional inputs after restoration
# Update state with any additional inputs (ignoring the 'id' key)
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
if filtered_inputs:
self._initialize_state(filtered_inputs)
# Start flow execution
# Emit FlowStartedEvent and log the start of the flow.
crewai_event_bus.emit(
self,
FlowStartedEvent(
@@ -763,16 +780,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
async def run_flow():
return await self.kickoff_async()
return asyncio.run(run_flow())
@init_flow_main_trace
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
if not self._start_methods:
raise ValueError("No start method defined")
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
@@ -789,6 +796,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
result=final_output,
),
)
return final_output
async def _execute_start_method(self, start_method_name: str) -> None:
@@ -814,7 +822,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
await self._execute_listeners(start_method_name, result)
@trace_flow_step
async def _execute_method(
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
) -> Any:
@@ -887,35 +894,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes
-----
- Routers are executed sequentially to maintain flow control
- Each router's result becomes the new trigger_method
- Each router's result becomes a new trigger_method
- Normal listeners are executed in parallel for efficiency
- Listeners can receive the trigger method's result as a parameter
"""
# First, handle routers repeatedly until no router triggers anymore
router_results = []
current_trigger = trigger_method
while True:
routers_triggered = self._find_triggered_methods(
trigger_method, router_only=True
current_trigger, router_only=True
)
if not routers_triggered:
break
for router_name in routers_triggered:
await self._execute_single_listener(router_name, result)
# After executing router, the router's result is the path
# The last router executed sets the trigger_method
# The router result is the last element in self._method_outputs
trigger_method = self._method_outputs[-1]
router_result = self._method_outputs[-1]
if router_result: # Only add non-None results
router_results.append(router_result)
current_trigger = (
router_result # Update for next iteration of router chain
)
# Now that no more routers are triggered by current trigger_method,
# execute normal listeners
listeners_triggered = self._find_triggered_methods(
trigger_method, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
# Now execute normal listeners for all router results and the original trigger
all_triggers = [trigger_method] + router_results
for current_trigger in all_triggers:
if current_trigger: # Skip None results
listeners_triggered = self._find_triggered_methods(
current_trigger, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
def _find_triggered_methods(
self, trigger_method: str, router_only: bool

View File

@@ -4,7 +4,7 @@ SQLite-based implementation of flow state persistence.
import json
import sqlite3
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional, Union
@@ -34,6 +34,7 @@ class SQLiteFlowPersistence(FlowPersistence):
ValueError: If db_path is invalid
"""
from crewai.utilities.paths import db_storage_path
# Get path from argument or default location
path = db_path or str(Path(db_storage_path()) / "flow_states.db")
@@ -46,7 +47,8 @@ class SQLiteFlowPersistence(FlowPersistence):
def init_db(self) -> None:
"""Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
conn.execute(
"""
CREATE TABLE IF NOT EXISTS flow_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_uuid TEXT NOT NULL,
@@ -54,12 +56,15 @@ class SQLiteFlowPersistence(FlowPersistence):
timestamp DATETIME NOT NULL,
state_json TEXT NOT NULL
)
""")
"""
)
# Add index for faster UUID lookups
conn.execute("""
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid
ON flow_states(flow_uuid)
""")
"""
)
def save_state(
self,
@@ -85,19 +90,22 @@ class SQLiteFlowPersistence(FlowPersistence):
)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
conn.execute(
"""
INSERT INTO flow_states (
flow_uuid,
method_name,
timestamp,
state_json
) VALUES (?, ?, ?, ?)
""", (
flow_uuid,
method_name,
datetime.utcnow().isoformat(),
json.dumps(state_dict),
))
""",
(
flow_uuid,
method_name,
datetime.now(timezone.utc).isoformat(),
json.dumps(state_dict),
),
)
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
@@ -109,13 +117,16 @@ class SQLiteFlowPersistence(FlowPersistence):
The most recent state as a dictionary, or None if no state exists
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
cursor = conn.execute(
"""
SELECT state_json
FROM flow_states
WHERE flow_uuid = ?
ORDER BY id DESC
LIMIT 1
""", (flow_uuid,))
""",
(flow_uuid,),
)
row = cursor.fetchone()
if row:

View File

@@ -16,7 +16,8 @@ Example
import ast
import inspect
import textwrap
from typing import Any, Dict, List, Optional, Set, Union
from collections import defaultdict, deque
from typing import Any, Deque, Dict, List, Optional, Set, Union
def get_possible_return_constants(function: Any) -> Optional[List[str]]:
@@ -118,7 +119,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
- Processes router paths separately
"""
levels: Dict[str, int] = {}
queue: List[str] = []
queue: Deque[str] = deque()
visited: Set[str] = set()
pending_and_listeners: Dict[str, Set[str]] = {}
@@ -128,28 +129,35 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
levels[method_name] = 0
queue.append(method_name)
# Precompute listener dependencies
or_listeners = defaultdict(list)
and_listeners = defaultdict(set)
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
if condition_type == "OR":
for method in trigger_methods:
or_listeners[method].append(listener_name)
elif condition_type == "AND":
and_listeners[listener_name] = set(trigger_methods)
# Breadth-first traversal to assign levels
while queue:
current = queue.pop(0)
current = queue.popleft()
current_level = levels[current]
visited.add(current)
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
if condition_type == "OR":
if current in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
elif condition_type == "AND":
for listener_name in or_listeners[current]:
if listener_name not in levels or levels[listener_name] > current_level + 1:
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
for listener_name, required_methods in and_listeners.items():
if current in required_methods:
if listener_name not in pending_and_listeners:
pending_and_listeners[listener_name] = set()
if current in trigger_methods:
pending_and_listeners[listener_name].add(current)
if set(trigger_methods) == pending_and_listeners[listener_name]:
pending_and_listeners[listener_name].add(current)
if required_methods == pending_and_listeners[listener_name]:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
@@ -159,22 +167,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
queue.append(listener_name)
# Handle router connections
if current in flow._routers:
router_method_name = current
paths = flow._router_paths.get(router_method_name, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
process_router_paths(flow, current, current_level, levels, queue)
return levels
@@ -227,10 +220,7 @@ def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
def dfs_ancestors(
node: str,
ancestors: Dict[str, Set[str]],
visited: Set[str],
flow: Any
node: str, ancestors: Dict[str, Set[str]], visited: Set[str], flow: Any
) -> None:
"""
Perform depth-first search to build ancestor relationships.
@@ -274,7 +264,9 @@ def dfs_ancestors(
dfs_ancestors(listener_name, ancestors, visited, flow)
def is_ancestor(node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]) -> bool:
def is_ancestor(
node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]
) -> bool:
"""
Check if one node is an ancestor of another.
@@ -339,7 +331,9 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
return parent_children
def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str]]) -> int:
def get_child_index(
parent: str, child: str, parent_children: Dict[str, List[str]]
) -> int:
"""
Get the index of a child node in its parent's sorted children list.
@@ -360,3 +354,23 @@ def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str
children = parent_children.get(parent, [])
children.sort()
return children.index(child)
def process_router_paths(flow, current, current_level, levels, queue):
"""
Handle the router connections for the current node.
"""
if current in flow._routers:
paths = flow._router_paths.get(current, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
queue.append(listener_name)

View File

@@ -1,4 +1,3 @@
import inspect
import json
import logging
import os
@@ -6,37 +5,31 @@ import sys
import threading
import warnings
from contextlib import contextmanager
from typing import (
Any,
Dict,
List,
Literal,
Optional,
Tuple,
Type,
Union,
cast,
)
from typing import Any, Dict, List, Literal, Optional, Type, Union, cast
from dotenv import load_dotenv
from pydantic import BaseModel
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
)
from crewai.utilities.events.tool_usage_events import ToolExecutionErrorEvent
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
import litellm
from litellm import Choices, get_supported_openai_params
from litellm import Choices
from litellm.types.utils import ModelResponse
from litellm.utils import supports_response_schema
from litellm.utils import get_supported_openai_params, supports_response_schema
from crewai.traces.unified_trace_controller import trace_llm_call
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from crewai.utilities.protocols import AgentExecutorProtocol
load_dotenv()
@@ -71,6 +64,7 @@ LLM_CONTEXT_WINDOW_SIZES = {
"gpt-4-turbo": 128000,
"o1-preview": 128000,
"o1-mini": 128000,
"o3-mini": 200000, # Based on official o3-mini specifications
# gemini
"gemini-2.0-flash": 1048576,
"gemini-1.5-pro": 2097152,
@@ -180,7 +174,6 @@ class LLM:
self.context_window_size = 0
self.reasoning_effort = reasoning_effort
self.additional_params = kwargs
self._message_history: List[Dict[str, str]] = []
self.is_anthropic = self._is_anthropic_model(model)
litellm.drop_params = True
@@ -196,12 +189,6 @@ class LLM:
self.set_callbacks(callbacks)
self.set_env_callbacks()
@trace_llm_call
def _call_llm(self, params: Dict[str, Any]) -> Any:
with suppress_warnings():
response = litellm.completion(**params)
return response
def _is_anthropic_model(self, model: str) -> bool:
"""Determine if the model is from Anthropic provider.
@@ -259,6 +246,15 @@ class LLM:
>>> print(response)
"The capital of France is Paris."
"""
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
),
)
# Validate parameters before proceeding with the call.
self._validate_call_params()
@@ -311,7 +307,7 @@ class LLM:
params = {k: v for k, v in params.items() if v is not None}
# --- 2) Make the completion call
response = self._call_llm(params)
response = litellm.completion(**params)
response_message = cast(Choices, cast(ModelResponse, response).choices)[
0
].message
@@ -333,12 +329,13 @@ class LLM:
# --- 4) If no tool calls, return the text response
if not tool_calls or not available_functions:
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL)
return text_response
# --- 5) Handle the tool call
tool_call = tool_calls[0]
function_name = tool_call.function.name
print("function_name", function_name)
if function_name in available_functions:
try:
function_args = json.loads(tool_call.function.arguments)
@@ -350,6 +347,7 @@ class LLM:
try:
# Call the actual tool function
result = fn(**function_args)
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
return result
except Exception as e:
@@ -365,6 +363,12 @@ class LLM:
error=str(e),
),
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=f"Tool execution error: {str(e)}"
),
)
return text_response
else:
@@ -374,12 +378,28 @@ class LLM:
return text_response
except Exception as e:
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e)),
)
if not LLMContextLengthExceededException(
str(e)
)._is_context_limit_error(str(e)):
logging.error(f"LiteLLM call failed: {str(e)}")
raise
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType):
"""Handle the events for the LLM call.
Args:
response (str): The response from the LLM call.
call_type (str): The type of call, either "tool_call" or "llm_call".
"""
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(response=response, call_type=call_type),
)
def _format_messages_for_provider(
self, messages: List[Dict[str, str]]
) -> List[Dict[str, str]]:
@@ -449,7 +469,7 @@ class LLM:
def supports_function_calling(self) -> bool:
try:
params = get_supported_openai_params(model=self.model)
return "response_format" in params
return params is not None and "tools" in params
except Exception as e:
logging.error(f"Failed to get supported params: {str(e)}")
return False
@@ -457,7 +477,7 @@ class LLM:
def supports_stop_words(self) -> bool:
try:
params = get_supported_openai_params(model=self.model)
return "stop" in params
return params is not None and "stop" in params
except Exception as e:
logging.error(f"Failed to get supported params: {str(e)}")
return False
@@ -466,10 +486,23 @@ class LLM:
"""
Returns the context window size, using 75% of the maximum to avoid
cutting off messages mid-thread.
Raises:
ValueError: If a model's context window size is outside valid bounds (1024-2097152)
"""
if self.context_window_size != 0:
return self.context_window_size
MIN_CONTEXT = 1024
MAX_CONTEXT = 2097152 # Current max from gemini-1.5-pro
# Validate all context window sizes
for key, value in LLM_CONTEXT_WINDOW_SIZES.items():
if value < MIN_CONTEXT or value > MAX_CONTEXT:
raise ValueError(
f"Context window for {key} must be between {MIN_CONTEXT} and {MAX_CONTEXT}"
)
self.context_window_size = int(
DEFAULT_CONTEXT_WINDOW_SIZE * CONTEXT_WINDOW_USAGE_RATIO
)
@@ -531,95 +564,3 @@ class LLM:
litellm.success_callback = success_callbacks
litellm.failure_callback = failure_callbacks
def _get_execution_context(self) -> Tuple[Optional[Any], Optional[Any]]:
"""Get the agent and task from the execution context.
Returns:
tuple: (agent, task) from any AgentExecutor context, or (None, None) if not found
"""
frame = inspect.currentframe()
caller_frame = frame.f_back if frame else None
agent = None
task = None
# Add a maximum depth to prevent infinite loops
max_depth = 100 # Reasonable limit for call stack depth
current_depth = 0
while caller_frame and current_depth < max_depth:
if "self" in caller_frame.f_locals:
caller_self = caller_frame.f_locals["self"]
if isinstance(caller_self, AgentExecutorProtocol):
agent = caller_self.agent
task = caller_self.task
break
caller_frame = caller_frame.f_back
current_depth += 1
return agent, task
def _get_new_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Get only the new messages that haven't been processed before."""
if not hasattr(self, "_message_history"):
self._message_history = []
new_messages = []
for message in messages:
message_key = (message["role"], message["content"])
if message_key not in [
(m["role"], m["content"]) for m in self._message_history
]:
new_messages.append(message)
self._message_history.append(message)
return new_messages
def _get_new_tool_results(self, agent) -> List[Dict]:
"""Get only the new tool results that haven't been processed before."""
if not agent or not agent.tools_results:
return []
if not hasattr(self, "_tool_results_history"):
self._tool_results_history: List[Dict] = []
new_tool_results = []
for result in agent.tools_results:
# Process tool arguments to extract actual values
processed_args = {}
if isinstance(result["tool_args"], dict):
for key, value in result["tool_args"].items():
if isinstance(value, dict) and "type" in value:
# Skip metadata and just store the actual value
continue
processed_args[key] = value
# Create a clean result with processed arguments
clean_result = {
"tool_name": result["tool_name"],
"tool_args": processed_args,
"result": result["result"],
"content": result.get("content", ""),
"start_time": result.get("start_time", ""),
}
# Check if this exact tool execution exists in history
is_duplicate = False
for history_result in self._tool_results_history:
if (
clean_result["tool_name"] == history_result["tool_name"]
and str(clean_result["tool_args"])
== str(history_result["tool_args"])
and str(clean_result["result"]) == str(history_result["result"])
and clean_result["content"] == history_result.get("content", "")
and clean_result["start_time"]
== history_result.get("start_time", "")
):
is_duplicate = True
break
if not is_duplicate:
new_tool_results.append(clean_result)
self._tool_results_history.append(clean_result)
return new_tool_results

View File

@@ -1,7 +1,7 @@
import warnings
from abc import ABC, abstractmethod
from inspect import signature
from typing import Any, Callable, Type, get_args, get_origin
from typing import Any, Callable, Optional, Type, get_args, get_origin
from pydantic import (
BaseModel,
@@ -19,11 +19,21 @@ from crewai.tools.structured_tool import CrewStructuredTool
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20)
# Define a helper function with an explicit signature
def default_cache_function(
_args: Optional[Any] = None, _result: Optional[Any] = None
) -> bool:
return True
class BaseTool(BaseModel, ABC):
class _ArgsSchemaPlaceholder(PydanticBaseModel):
pass
model_config = ConfigDict()
model_config = ConfigDict(
arbitrary_types_allowed=True,
from_attributes=True, # Allow conversion from ORM objects
)
name: str
"""The unique name of the tool that clearly communicates its purpose."""
@@ -33,8 +43,10 @@ class BaseTool(BaseModel, ABC):
"""The schema for the arguments that the tool accepts."""
description_updated: bool = False
"""Flag to check if the description has been updated."""
cache_function: Callable = lambda _args=None, _result=None: True
"""Function that will be used to determine if the tool should be cached, should return a boolean. If None, the tool will be cached."""
cache_function: Callable[[Optional[Any], Optional[Any]], bool] = (
default_cache_function
)
"""Function used to determine if the tool should be cached."""
result_as_answer: bool = False
"""Flag to check if the tool should be the final agent answer."""
@@ -177,74 +189,43 @@ class BaseTool(BaseModel, ABC):
return origin.__name__
@property
def get(self) -> Callable[[str, Any], Any]:
# Instead of an inline lambda, we define a helper function with explicit types.
def _getter(key: str, default: Any = None) -> Any:
return getattr(self, key, default)
return _getter
class Tool(BaseTool):
"""The function that will be executed when the tool is called."""
"""Tool implementation that requires a function."""
func: Callable
model_config = ConfigDict(
arbitrary_types_allowed=True,
from_attributes=True,
)
def _run(self, *args: Any, **kwargs: Any) -> Any:
return self.func(*args, **kwargs)
@classmethod
def from_langchain(cls, tool: Any) -> "Tool":
"""Create a Tool instance from a CrewStructuredTool.
def to_langchain(self) -> Any:
"""Convert to a LangChain-compatible tool."""
try:
from langchain_core.tools import Tool as LC_Tool
except ImportError:
raise ImportError("langchain_core is not installed")
This method takes a CrewStructuredTool object and converts it into a
Tool instance. It ensures that the provided tool has a callable 'func'
attribute and infers the argument schema if not explicitly provided.
Args:
tool (Any): The CrewStructuredTool object to be converted.
Returns:
Tool: A new Tool instance created from the provided CrewStructuredTool.
Raises:
ValueError: If the provided tool does not have a callable 'func' attribute.
"""
if not hasattr(tool, "func") or not callable(tool.func):
raise ValueError("The provided tool must have a callable 'func' attribute.")
args_schema = getattr(tool, "args_schema", None)
if args_schema is None:
# Infer args_schema from the function signature if not provided
func_signature = signature(tool.func)
annotations = func_signature.parameters
args_fields = {}
for name, param in annotations.items():
if name != "self":
param_annotation = (
param.annotation if param.annotation != param.empty else Any
)
field_info = Field(
default=...,
description="",
)
args_fields[name] = (param_annotation, field_info)
if args_fields:
args_schema = create_model(f"{tool.name}Input", **args_fields)
else:
# Create a default schema with no fields if no parameters are found
args_schema = create_model(
f"{tool.name}Input", __base__=PydanticBaseModel
)
return cls(
name=getattr(tool, "name", "Unnamed Tool"),
description=getattr(tool, "description", ""),
func=tool.func,
args_schema=args_schema,
# Use self._run (which is bound and calls self.func) so that the LC_Tool gets proper attributes.
return LC_Tool(
name=self.name,
description=self.description,
func=self._run,
args_schema=self.args_schema,
)
def to_langchain(
tools: list[BaseTool | CrewStructuredTool],
) -> list[CrewStructuredTool]:
return [t.to_structured_tool() if isinstance(t, BaseTool) else t for t in tools]
def tool(*args):
"""
Decorator to create a tool from a function.

View File

@@ -2,7 +2,6 @@ import ast
import datetime
import json
import time
from datetime import UTC
from difflib import SequenceMatcher
from json import JSONDecodeError
from textwrap import dedent
@@ -118,10 +117,7 @@ class ToolUsage:
self._printer.print(content=f"\n\n{error}\n", color="red")
return error
if (
isinstance(tool, CrewStructuredTool)
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
):
if isinstance(tool, CrewStructuredTool) and tool.name == self._i18n.tools("add_image")["name"]: # type: ignore
try:
result = self._use(tool_string=tool_string, tool=tool, calling=calling)
return result
@@ -158,7 +154,6 @@ class ToolUsage:
self.task.increment_tools_errors()
started_at = time.time()
started_at_trace = datetime.datetime.now(UTC)
from_cache = False
result = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
@@ -186,9 +181,7 @@ class ToolUsage:
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys() # type: ignore
acceptable_args = tool.args_schema.model_json_schema()["properties"].keys() # type: ignore
arguments = {
k: v
for k, v in calling.arguments.items()
@@ -209,7 +202,7 @@ class ToolUsage:
error=e, tool=tool.name, tool_inputs=tool.description
)
error = ToolUsageErrorException(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
f'\n{error_message}.\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
).message
self.task.increment_tools_errors()
if self.agent.verbose:
@@ -244,7 +237,6 @@ class ToolUsage:
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
"start_time": started_at_trace,
}
self.on_tool_use_finished(
@@ -388,7 +380,7 @@ class ToolUsage:
raise
else:
return ToolUsageErrorException(
f"{self._i18n.errors('tool_arguments_error')}"
f'{self._i18n.errors("tool_arguments_error")}'
)
if not isinstance(arguments, dict):
@@ -396,7 +388,7 @@ class ToolUsage:
raise
else:
return ToolUsageErrorException(
f"{self._i18n.errors('tool_arguments_error')}"
f'{self._i18n.errors("tool_arguments_error")}'
)
return ToolCalling(
@@ -424,7 +416,7 @@ class ToolUsage:
if self.agent.verbose:
self._printer.print(content=f"\n\n{e}\n", color="red")
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
f'{self._i18n.errors("tool_usage_error").format(error=e)}\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
)
return self._tool_calling(tool_string)

View File

@@ -1,39 +0,0 @@
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Generator
class TraceContext:
"""Maintains the current trace context throughout the execution stack.
This class provides a context manager for tracking trace execution across
async and sync code paths using ContextVars.
"""
_context: ContextVar = ContextVar("trace_context", default=None)
@classmethod
def get_current(cls):
"""Get the current trace context.
Returns:
Optional[UnifiedTraceController]: The current trace controller or None if not set.
"""
return cls._context.get()
@classmethod
@contextmanager
def set_current(cls, trace):
"""Set the current trace context within a context manager.
Args:
trace: The trace controller to set as current.
Yields:
UnifiedTraceController: The current trace controller.
"""
token = cls._context.set(trace)
try:
yield trace
finally:
cls._context.reset(token)

View File

@@ -1,19 +0,0 @@
from enum import Enum
class TraceType(Enum):
LLM_CALL = "llm_call"
TOOL_CALL = "tool_call"
FLOW_STEP = "flow_step"
START_CALL = "start_call"
class RunType(Enum):
KICKOFF = "kickoff"
TRAIN = "train"
TEST = "test"
class CrewType(Enum):
CREW = "crew"
FLOW = "flow"

View File

@@ -1,89 +0,0 @@
from datetime import datetime
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class ToolCall(BaseModel):
"""Model representing a tool call during execution"""
name: str
arguments: Dict[str, Any]
output: str
start_time: datetime
end_time: Optional[datetime] = None
latency_ms: Optional[int] = None
error: Optional[str] = None
class LLMRequest(BaseModel):
"""Model representing the LLM request details"""
model: str
messages: List[Dict[str, str]]
temperature: Optional[float] = None
max_tokens: Optional[int] = None
stop_sequences: Optional[List[str]] = None
additional_params: Dict[str, Any] = Field(default_factory=dict)
class LLMResponse(BaseModel):
"""Model representing the LLM response details"""
content: str
finish_reason: Optional[str] = None
class FlowStepIO(BaseModel):
"""Model representing flow step input/output details"""
function_name: str
inputs: Dict[str, Any] = Field(default_factory=dict)
outputs: Any
metadata: Dict[str, Any] = Field(default_factory=dict)
class CrewTrace(BaseModel):
"""Model for tracking detailed information about LLM interactions and Flow steps"""
deployment_instance_id: Optional[str] = Field(
description="ID of the deployment instance"
)
trace_id: str = Field(description="Unique identifier for this trace")
run_id: str = Field(description="Identifier for the execution run")
agent_role: Optional[str] = Field(description="Role of the agent")
task_id: Optional[str] = Field(description="ID of the current task being executed")
task_name: Optional[str] = Field(description="Name of the current task")
task_description: Optional[str] = Field(
description="Description of the current task"
)
trace_type: str = Field(description="Type of the trace")
crew_type: str = Field(description="Type of the crew")
run_type: str = Field(description="Type of the run")
# Timing information
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
latency_ms: Optional[int] = None
# Request/Response for LLM calls
request: Optional[LLMRequest] = None
response: Optional[LLMResponse] = None
# Input/Output for Flow steps
flow_step: Optional[FlowStepIO] = None
# Tool usage
tool_calls: List[ToolCall] = Field(default_factory=list)
# Metrics
tokens_used: Optional[int] = None
prompt_tokens: Optional[int] = None
completion_tokens: Optional[int] = None
cost: Optional[float] = None
# Additional metadata
status: str = "running" # running, completed, error
error: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
tags: List[str] = Field(default_factory=list)

View File

@@ -1,543 +0,0 @@
import inspect
import os
from datetime import UTC, datetime
from functools import wraps
from typing import Any, Awaitable, Callable, Dict, List, Optional
from uuid import uuid4
from crewai.traces.context import TraceContext
from crewai.traces.enums import CrewType, RunType, TraceType
from crewai.traces.models import (
CrewTrace,
FlowStepIO,
LLMRequest,
LLMResponse,
ToolCall,
)
class UnifiedTraceController:
"""Controls and manages trace execution and recording.
This class handles the lifecycle of traces including creation, execution tracking,
and recording of results for various types of operations (LLM calls, tool calls, flow steps).
"""
_task_traces: Dict[str, List["UnifiedTraceController"]] = {}
def __init__(
self,
trace_type: TraceType,
run_type: RunType,
crew_type: CrewType,
run_id: str,
deployment_instance_id: str = os.environ.get(
"CREWAI_DEPLOYMENT_INSTANCE_ID", ""
),
parent_trace_id: Optional[str] = None,
agent_role: Optional[str] = "unknown",
task_name: Optional[str] = None,
task_description: Optional[str] = None,
task_id: Optional[str] = None,
flow_step: Dict[str, Any] = {},
tool_calls: List[ToolCall] = [],
**context: Any,
) -> None:
"""Initialize a new trace controller.
Args:
trace_type: Type of trace being recorded.
run_type: Type of run being executed.
crew_type: Type of crew executing the trace.
run_id: Unique identifier for the run.
deployment_instance_id: Optional deployment instance identifier.
parent_trace_id: Optional parent trace identifier for nested traces.
agent_role: Role of the agent executing the trace.
task_name: Optional name of the task being executed.
task_description: Optional description of the task.
task_id: Optional unique identifier for the task.
flow_step: Optional flow step information.
tool_calls: Optional list of tool calls made during execution.
**context: Additional context parameters.
"""
self.trace_id = str(uuid4())
self.run_id = run_id
self.parent_trace_id = parent_trace_id
self.trace_type = trace_type
self.run_type = run_type
self.crew_type = crew_type
self.context = context
self.agent_role = agent_role
self.task_name = task_name
self.task_description = task_description
self.task_id = task_id
self.deployment_instance_id = deployment_instance_id
self.children: List[Dict[str, Any]] = []
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
self.error: Optional[str] = None
self.tool_calls = tool_calls
self.flow_step = flow_step
self.status: str = "running"
# Add trace to task's trace collection if task_id is present
if task_id:
self._add_to_task_traces()
def _add_to_task_traces(self) -> None:
"""Add this trace to the task's trace collection."""
if not hasattr(UnifiedTraceController, "_task_traces"):
UnifiedTraceController._task_traces = {}
if self.task_id is None:
return
if self.task_id not in UnifiedTraceController._task_traces:
UnifiedTraceController._task_traces[self.task_id] = []
UnifiedTraceController._task_traces[self.task_id].append(self)
@classmethod
def get_task_traces(cls, task_id: str) -> List["UnifiedTraceController"]:
"""Get all traces for a specific task.
Args:
task_id: The ID of the task to get traces for
Returns:
List of traces associated with the task
"""
return cls._task_traces.get(task_id, [])
@classmethod
def clear_task_traces(cls, task_id: str) -> None:
"""Clear traces for a specific task.
Args:
task_id: The ID of the task to clear traces for
"""
if hasattr(cls, "_task_traces") and task_id in cls._task_traces:
del cls._task_traces[task_id]
def _get_current_trace(self) -> "UnifiedTraceController":
return TraceContext.get_current()
def start_trace(self) -> "UnifiedTraceController":
"""Start the trace execution.
Returns:
UnifiedTraceController: Self for method chaining.
"""
self.start_time = datetime.now(UTC)
return self
def end_trace(self, result: Any = None, error: Optional[str] = None) -> None:
"""End the trace execution and record results.
Args:
result: Optional result from the trace execution.
error: Optional error message if the trace failed.
"""
self.end_time = datetime.now(UTC)
self.status = "error" if error else "completed"
self.error = error
self._record_trace(result)
def add_child_trace(self, child_trace: Dict[str, Any]) -> None:
"""Add a child trace to this trace's execution history.
Args:
child_trace: The child trace information to add.
"""
self.children.append(child_trace)
def to_crew_trace(self) -> CrewTrace:
"""Convert to CrewTrace format for storage.
Returns:
CrewTrace: The trace data in CrewTrace format.
"""
latency_ms = None
if self.tool_calls and hasattr(self.tool_calls[0], "start_time"):
self.start_time = self.tool_calls[0].start_time
if self.start_time and self.end_time:
latency_ms = int((self.end_time - self.start_time).total_seconds() * 1000)
request = None
response = None
flow_step_obj = None
if self.trace_type in [TraceType.LLM_CALL, TraceType.TOOL_CALL]:
request = LLMRequest(
model=self.context.get("model", "unknown"),
messages=self.context.get("messages", []),
temperature=self.context.get("temperature"),
max_tokens=self.context.get("max_tokens"),
stop_sequences=self.context.get("stop_sequences"),
)
if "response" in self.context:
response = LLMResponse(
content=self.context["response"].get("content", ""),
finish_reason=self.context["response"].get("finish_reason"),
)
elif self.trace_type == TraceType.FLOW_STEP:
flow_step_obj = FlowStepIO(
function_name=self.flow_step.get("function_name", "unknown"),
inputs=self.flow_step.get("inputs", {}),
outputs={"result": self.context.get("response")},
metadata=self.flow_step.get("metadata", {}),
)
return CrewTrace(
deployment_instance_id=self.deployment_instance_id,
trace_id=self.trace_id,
task_id=self.task_id,
run_id=self.run_id,
agent_role=self.agent_role,
task_name=self.task_name,
task_description=self.task_description,
trace_type=self.trace_type.value,
crew_type=self.crew_type.value,
run_type=self.run_type.value,
start_time=self.start_time,
end_time=self.end_time,
latency_ms=latency_ms,
request=request,
response=response,
flow_step=flow_step_obj,
tool_calls=self.tool_calls,
tokens_used=self.context.get("tokens_used"),
prompt_tokens=self.context.get("prompt_tokens"),
completion_tokens=self.context.get("completion_tokens"),
status=self.status,
error=self.error,
)
def _record_trace(self, result: Any = None) -> None:
"""Record the trace.
This method is called when a trace is completed. It ensures the trace
is properly recorded and associated with its task if applicable.
Args:
result: Optional result to include in the trace
"""
if result:
self.context["response"] = result
# Add to task traces if this trace belongs to a task
if self.task_id:
self._add_to_task_traces()
def should_trace() -> bool:
"""Check if tracing is enabled via environment variable."""
return os.getenv("CREWAI_ENABLE_TRACING", "false").lower() == "true"
# Crew main trace
def init_crew_main_trace(func: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator to initialize and track the main crew execution trace.
This decorator sets up the trace context for the main crew execution,
handling both synchronous and asynchronous crew operations.
Args:
func: The crew function to be traced.
Returns:
Wrapped function that creates and manages the main crew trace context.
"""
@wraps(func)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
if not should_trace():
return func(self, *args, **kwargs)
trace = build_crew_main_trace(self)
with TraceContext.set_current(trace):
try:
return func(self, *args, **kwargs)
except Exception as e:
trace.end_trace(error=str(e))
raise
return wrapper
def build_crew_main_trace(self: Any) -> "UnifiedTraceController":
"""Build the main trace controller for a crew execution.
This function creates a trace controller configured for the main crew execution,
handling different run types (kickoff, test, train) and maintaining context.
Args:
self: The crew instance.
Returns:
UnifiedTraceController: The configured trace controller for the crew.
"""
run_type = RunType.KICKOFF
if hasattr(self, "_test") and self._test:
run_type = RunType.TEST
elif hasattr(self, "_train") and self._train:
run_type = RunType.TRAIN
current_trace = TraceContext.get_current()
trace = UnifiedTraceController(
trace_type=TraceType.LLM_CALL,
run_type=run_type,
crew_type=current_trace.crew_type if current_trace else CrewType.CREW,
run_id=current_trace.run_id if current_trace else str(self.id),
parent_trace_id=current_trace.trace_id if current_trace else None,
)
return trace
# Flow main trace
def init_flow_main_trace(
func: Callable[..., Awaitable[Any]],
) -> Callable[..., Awaitable[Any]]:
"""Decorator to initialize and track the main flow execution trace.
Args:
func: The async flow function to be traced.
Returns:
Wrapped async function that creates and manages the main flow trace context.
"""
@wraps(func)
async def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
if not should_trace():
return await func(self, *args, **kwargs)
trace = build_flow_main_trace(self, *args, **kwargs)
with TraceContext.set_current(trace):
try:
return await func(self, *args, **kwargs)
except Exception:
raise
return wrapper
def build_flow_main_trace(
self: Any, *args: Any, **kwargs: Any
) -> "UnifiedTraceController":
"""Build the main trace controller for a flow execution.
Args:
self: The flow instance.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
Returns:
UnifiedTraceController: The configured trace controller for the flow.
"""
current_trace = TraceContext.get_current()
trace = UnifiedTraceController(
trace_type=TraceType.FLOW_STEP,
run_id=current_trace.run_id if current_trace else str(self.flow_id),
parent_trace_id=current_trace.trace_id if current_trace else None,
crew_type=CrewType.FLOW,
run_type=RunType.KICKOFF,
context={
"crew_name": self.__class__.__name__,
"inputs": kwargs.get("inputs", {}),
"agents": [],
"tasks": [],
},
)
return trace
# Flow step trace
def trace_flow_step(
func: Callable[..., Awaitable[Any]],
) -> Callable[..., Awaitable[Any]]:
"""Decorator to trace individual flow step executions.
Args:
func: The async flow step function to be traced.
Returns:
Wrapped async function that creates and manages the flow step trace context.
"""
@wraps(func)
async def wrapper(
self: Any,
method_name: str,
method: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Any:
if not should_trace():
return await func(self, method_name, method, *args, **kwargs)
trace = build_flow_step_trace(self, method_name, method, *args, **kwargs)
with TraceContext.set_current(trace):
trace.start_trace()
try:
result = await func(self, method_name, method, *args, **kwargs)
trace.end_trace(result=result)
return result
except Exception as e:
trace.end_trace(error=str(e))
raise
return wrapper
def build_flow_step_trace(
self: Any, method_name: str, method: Callable[..., Any], *args: Any, **kwargs: Any
) -> "UnifiedTraceController":
"""Build a trace controller for an individual flow step.
Args:
self: The flow instance.
method_name: Name of the method being executed.
method: The actual method being executed.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
Returns:
UnifiedTraceController: The configured trace controller for the flow step.
"""
current_trace = TraceContext.get_current()
# Get method signature
sig = inspect.signature(method)
params = list(sig.parameters.values())
# Create inputs dictionary mapping parameter names to values
method_params = [p for p in params if p.name != "self"]
inputs: Dict[str, Any] = {}
# Map positional args to their parameter names
for i, param in enumerate(method_params):
if i < len(args):
inputs[param.name] = args[i]
# Add keyword arguments
inputs.update(kwargs)
trace = UnifiedTraceController(
trace_type=TraceType.FLOW_STEP,
run_type=current_trace.run_type if current_trace else RunType.KICKOFF,
crew_type=current_trace.crew_type if current_trace else CrewType.FLOW,
run_id=current_trace.run_id if current_trace else str(self.flow_id),
parent_trace_id=current_trace.trace_id if current_trace else None,
flow_step={
"function_name": method_name,
"inputs": inputs,
"metadata": {
"crew_name": self.__class__.__name__,
},
},
)
return trace
# LLM trace
def trace_llm_call(func: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator to trace LLM calls.
Args:
func: The function to trace.
Returns:
Wrapped function that creates and manages the LLM call trace context.
"""
@wraps(func)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
if not should_trace():
return func(self, *args, **kwargs)
trace = build_llm_trace(self, *args, **kwargs)
with TraceContext.set_current(trace):
trace.start_trace()
try:
response = func(self, *args, **kwargs)
# Extract relevant data from response
trace_response = {
"content": response["choices"][0]["message"]["content"],
"finish_reason": response["choices"][0].get("finish_reason"),
}
# Add usage metrics to context
if "usage" in response:
trace.context["tokens_used"] = response["usage"].get(
"total_tokens", 0
)
trace.context["prompt_tokens"] = response["usage"].get(
"prompt_tokens", 0
)
trace.context["completion_tokens"] = response["usage"].get(
"completion_tokens", 0
)
trace.end_trace(trace_response)
return response
except Exception as e:
trace.end_trace(error=str(e))
raise
return wrapper
def build_llm_trace(
self: Any, params: Dict[str, Any], *args: Any, **kwargs: Any
) -> Any:
"""Build a trace controller for an LLM call.
Args:
self: The LLM instance.
params: The parameters for the LLM call.
*args: Variable positional arguments.
**kwargs: Variable keyword arguments.
Returns:
UnifiedTraceController: The configured trace controller for the LLM call.
"""
current_trace = TraceContext.get_current()
agent, task = self._get_execution_context()
# Get new messages and tool results
new_messages = self._get_new_messages(params.get("messages", []))
new_tool_results = self._get_new_tool_results(agent)
# Create trace context
trace = UnifiedTraceController(
trace_type=TraceType.TOOL_CALL if new_tool_results else TraceType.LLM_CALL,
crew_type=current_trace.crew_type if current_trace else CrewType.CREW,
run_type=current_trace.run_type if current_trace else RunType.KICKOFF,
run_id=current_trace.run_id if current_trace else str(uuid4()),
parent_trace_id=current_trace.trace_id if current_trace else None,
agent_role=agent.role if agent else "unknown",
task_id=str(task.id) if task else None,
task_name=task.name if task else None,
task_description=task.description if task else None,
model=self.model,
messages=new_messages,
temperature=self.temperature,
max_tokens=self.max_tokens,
stop_sequences=self.stop,
tool_calls=[
ToolCall(
name=result["tool_name"],
arguments=result["tool_args"],
output=str(result["result"]),
start_time=result.get("start_time", ""),
end_time=datetime.now(UTC),
)
for result in new_tool_results
],
)
return trace

View File

@@ -39,8 +39,8 @@
"validation_error": "### Previous attempt failed validation: {guardrail_result_error}\n\n\n### Previous result:\n{task_output}\n\n\nTry again, making sure to address the validation error."
},
"tools": {
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",
"ask_question": "Ask a specific question to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the question you have for them, and ALL necessary context to ask the question properly, they know nothing about the question, so share absolute everything you know, don't reference things but instead explain them.",
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolutely everything you know, don't reference things but instead explain them.",
"ask_question": "Ask a specific question to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the question you have for them, and ALL necessary context to ask the question properly, they know nothing about the question, so share absolutely everything you know, don't reference things but instead explain them.",
"add_image": {
"name": "Add image to content",
"description": "See image to understand its content, you can optionally ask a question about the image",

View File

@@ -20,11 +20,11 @@ class ConverterError(Exception):
class Converter(OutputConverter):
"""Class that converts text into either pydantic or json."""
def to_pydantic(self, current_attempt=1):
def to_pydantic(self, current_attempt=1) -> BaseModel:
"""Convert text to pydantic."""
try:
if self.llm.supports_function_calling():
return self._create_instructor().to_pydantic()
result = self._create_instructor().to_pydantic()
else:
response = self.llm.call(
[
@@ -32,18 +32,40 @@ class Converter(OutputConverter):
{"role": "user", "content": self.text},
]
)
return self.model.model_validate_json(response)
try:
# Try to directly validate the response JSON
result = self.model.model_validate_json(response)
except ValidationError:
# If direct validation fails, attempt to extract valid JSON
result = handle_partial_json(response, self.model, False, None)
# Ensure result is a BaseModel instance
if not isinstance(result, BaseModel):
if isinstance(result, dict):
result = self.model.parse_obj(result)
elif isinstance(result, str):
try:
parsed = json.loads(result)
result = self.model.parse_obj(parsed)
except Exception as parse_err:
raise ConverterError(
f"Failed to convert partial JSON result into Pydantic: {parse_err}"
)
else:
raise ConverterError(
"handle_partial_json returned an unexpected type."
)
return result
except ValidationError as e:
if current_attempt < self.max_attempts:
return self.to_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to the following validation error: {e}"
f"Failed to convert text into a Pydantic model due to validation error: {e}"
)
except Exception as e:
if current_attempt < self.max_attempts:
return self.to_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to the following error: {e}"
f"Failed to convert text into a Pydantic model due to error: {e}"
)
def to_json(self, current_attempt=1):
@@ -197,11 +219,15 @@ def get_conversion_instructions(model: Type[BaseModel], llm: Any) -> str:
if llm.supports_function_calling():
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions += (
f"\n\nThe JSON should follow this schema:\n```json\n{model_schema}\n```"
f"\n\nOutput ONLY the valid JSON and nothing else.\n\n"
f"The JSON must follow this schema exactly:\n```json\n{model_schema}\n```"
)
else:
model_description = generate_model_description(model)
instructions += f"\n\nThe JSON should follow this format:\n{model_description}"
instructions += (
f"\n\nOutput ONLY the valid JSON and nothing else.\n\n"
f"The JSON must follow this format exactly:\n{model_description}"
)
return instructions

View File

@@ -34,6 +34,7 @@ from .tool_usage_events import (
ToolUsageEvent,
ToolValidateInputErrorEvent,
)
from .llm_events import LLMCallCompletedEvent, LLMCallFailedEvent, LLMCallStartedEvent
# events
from .event_listener import EventListener

View File

@@ -1,9 +1,17 @@
from pydantic import PrivateAttr
from typing import Any, Dict
from pydantic import Field, PrivateAttr
from crewai.task import Task
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import Logger
from crewai.utilities.constants import EMITTER_COLOR
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
)
from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent
from .crew_events import (
@@ -37,6 +45,7 @@ class EventListener(BaseEventListener):
_instance = None
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
def __new__(cls):
if cls._instance is None:
@@ -49,6 +58,7 @@ class EventListener(BaseEventListener):
super().__init__()
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self.execution_spans = {}
self._initialized = True
# ----------- CREW EVENTS -----------
@@ -57,7 +67,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent):
self.logger.log(
f"🚀 Crew '{event.crew_name}' started",
f"🚀 Crew '{event.crew_name}' started, {source.id}",
event.timestamp,
)
self._telemetry.crew_execution_span(source, event.inputs)
@@ -67,28 +77,28 @@ class EventListener(BaseEventListener):
final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output)
self.logger.log(
f"✅ Crew '{event.crew_name}' completed",
f"✅ Crew '{event.crew_name}' completed, {source.id}",
event.timestamp,
)
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed",
f"❌ Crew '{event.crew_name}' failed, {source.id}",
event.timestamp,
)
@crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent):
cloned_crew = source.copy()
cloned_crew._telemetry.test_execution_span(
self._telemetry.test_execution_span(
cloned_crew,
event.n_iterations,
event.inputs,
event.eval_llm,
event.eval_llm or "",
)
self.logger.log(
f"🚀 Crew '{event.crew_name}' started test",
f"🚀 Crew '{event.crew_name}' started test, {source.id}",
event.timestamp,
)
@@ -131,9 +141,9 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
source._execution_span = self._telemetry.task_started(
crew=source.agent.crew, task=source
)
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span
self.logger.log(
f"📋 Task started: {source.description}",
event.timestamp,
@@ -141,24 +151,22 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
if source._execution_span:
self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
)
span = self.execution_spans.get(source)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.logger.log(
f"✅ Task completed: {source.description}",
event.timestamp,
)
source._execution_span = None
self.execution_spans[source] = None
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
if source._execution_span:
span = self.execution_spans.get(source)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(
source._execution_span, source, source.agent.crew
)
source._execution_span = None
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
self.logger.log(
f"❌ Task failed: {source.description}",
event.timestamp,
@@ -184,7 +192,7 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(self.__class__.__name__)
self._telemetry.flow_creation_span(event.flow_name)
self.logger.log(
f"🌊 Flow Created: '{event.flow_name}'",
event.timestamp,
@@ -193,17 +201,17 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span(
source.__class__.__name__, list(source._methods.keys())
event.flow_name, list(source._methods.keys())
)
self.logger.log(
f"🤖 Flow Started: '{event.flow_name}'",
f"🤖 Flow Started: '{event.flow_name}', {source.flow_id}",
event.timestamp,
)
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
self.logger.log(
f"👍 Flow Finished: '{event.flow_name}'",
f"👍 Flow Finished: '{event.flow_name}', {source.flow_id}",
event.timestamp,
)
@@ -253,5 +261,28 @@ class EventListener(BaseEventListener):
#
)
# ----------- LLM EVENTS -----------
@crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
self.logger.log(
f"🤖 LLM Call Started",
event.timestamp,
)
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.logger.log(
f"✅ LLM Call Completed",
event.timestamp,
)
@crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event: LLMCallFailedEvent):
self.logger.log(
f"❌ LLM Call Failed: '{event.error}'",
event.timestamp,
)
event_listener = EventListener()

View File

@@ -0,0 +1,36 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from crewai.utilities.events.base_events import CrewEvent
class LLMCallType(Enum):
"""Type of LLM call being made"""
TOOL_CALL = "tool_call"
LLM_CALL = "llm_call"
class LLMCallStartedEvent(CrewEvent):
"""Event emitted when a LLM call starts"""
type: str = "llm_call_started"
messages: Union[str, List[Dict[str, str]]]
tools: Optional[List[dict]] = None
callbacks: Optional[List[Any]] = None
available_functions: Optional[Dict[str, Any]] = None
class LLMCallCompletedEvent(CrewEvent):
"""Event emitted when a LLM call completes"""
type: str = "llm_call_completed"
response: Any
call_type: LLMCallType
class LLMCallFailedEvent(CrewEvent):
"""Event emitted when a LLM call fails"""
error: str
type: str = "llm_call_failed"

View File

@@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Optional
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.events.base_events import CrewEvent

View File

@@ -44,6 +44,7 @@ def create_llm(
# Extract attributes with explicit types
model = (
getattr(llm_value, "model_name", None)
or getattr(llm_value, "model", None)
or getattr(llm_value, "deployment_name", None)
or str(llm_value)
)

View File

@@ -1,12 +0,0 @@
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class AgentExecutorProtocol(Protocol):
"""Protocol defining the expected interface for an agent executor."""
@property
def agent(self) -> Any: ...
@property
def task(self) -> Any: ...

View File

@@ -1,15 +1,52 @@
import warnings
from typing import Any, Dict, Optional
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from langchain_core.callbacks.base import BaseCallbackHandler
from litellm.integrations.custom_logger import CustomLogger
from litellm.types.utils import Usage
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
class TokenCalcHandler(CustomLogger):
def __init__(self, token_cost_process: Optional[TokenProcess]):
self.token_cost_process = token_cost_process
class AbstractTokenCounter(ABC):
"""
Abstract base class for token counting callbacks.
Implementations should track token usage from different LLM providers.
"""
def __init__(self, token_process: Optional[TokenProcess] = None):
"""Initialize with a TokenProcess instance to track tokens."""
self.token_process = token_process
@abstractmethod
def update_token_usage(self, prompt_tokens: int, completion_tokens: int) -> None:
"""Update token usage counts in the token process."""
pass
class LiteLLMTokenCounter(CustomLogger, AbstractTokenCounter):
"""
Token counter implementation for LiteLLM.
Uses LiteLLM's CustomLogger interface to track token usage.
"""
def __init__(self, token_process: Optional[TokenProcess] = None):
AbstractTokenCounter.__init__(self, token_process)
CustomLogger.__init__(self)
def update_token_usage(self, prompt_tokens: int, completion_tokens: int) -> None:
"""Update token usage counts in the token process."""
if self.token_process is None:
return
if prompt_tokens > 0:
self.token_process.sum_prompt_tokens(prompt_tokens)
if completion_tokens > 0:
self.token_process.sum_completion_tokens(completion_tokens)
self.token_process.sum_successful_requests(1)
def log_success_event(
self,
@@ -18,7 +55,11 @@ class TokenCalcHandler(CustomLogger):
start_time: float,
end_time: float,
) -> None:
if self.token_cost_process is None:
"""
Process successful LLM call and extract token usage information.
This method is called by LiteLLM after a successful completion.
"""
if self.token_process is None:
return
with warnings.catch_warnings():
@@ -26,12 +67,159 @@ class TokenCalcHandler(CustomLogger):
if isinstance(response_obj, dict) and "usage" in response_obj:
usage: Usage = response_obj["usage"]
if usage:
self.token_cost_process.sum_successful_requests(1)
prompt_tokens = 0
completion_tokens = 0
if hasattr(usage, "prompt_tokens"):
self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens)
prompt_tokens = usage.prompt_tokens
elif isinstance(usage, dict) and "prompt_tokens" in usage:
prompt_tokens = usage["prompt_tokens"]
if hasattr(usage, "completion_tokens"):
self.token_cost_process.sum_completion_tokens(usage.completion_tokens)
if hasattr(usage, "prompt_tokens_details") and usage.prompt_tokens_details:
self.token_cost_process.sum_cached_prompt_tokens(
completion_tokens = usage.completion_tokens
elif isinstance(usage, dict) and "completion_tokens" in usage:
completion_tokens = usage["completion_tokens"]
self.update_token_usage(prompt_tokens, completion_tokens)
# Handle cached tokens if available
if (
hasattr(usage, "prompt_tokens_details")
and usage.prompt_tokens_details
and usage.prompt_tokens_details.cached_tokens
):
self.token_process.sum_cached_prompt_tokens(
usage.prompt_tokens_details.cached_tokens
)
class LangChainTokenCounter(BaseCallbackHandler, AbstractTokenCounter):
"""
Token counter implementation for LangChain.
Implements the necessary callback methods to track token usage from LangChain responses.
"""
def __init__(self, token_process: Optional[TokenProcess] = None):
BaseCallbackHandler.__init__(self)
AbstractTokenCounter.__init__(self, token_process)
def update_token_usage(self, prompt_tokens: int, completion_tokens: int) -> None:
"""Update token usage counts in the token process."""
if self.token_process is None:
return
if prompt_tokens > 0:
self.token_process.sum_prompt_tokens(prompt_tokens)
if completion_tokens > 0:
self.token_process.sum_completion_tokens(completion_tokens)
self.token_process.sum_successful_requests(1)
@property
def ignore_llm(self) -> bool:
return False
@property
def ignore_chain(self) -> bool:
return True
@property
def ignore_agent(self) -> bool:
return False
@property
def ignore_chat_model(self) -> bool:
return False
@property
def ignore_retriever(self) -> bool:
return True
@property
def ignore_tools(self) -> bool:
return True
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Called when LLM starts processing."""
pass
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Called when LLM generates a new token."""
pass
def on_llm_end(self, response: Any, **kwargs: Any) -> None:
"""
Called when LLM ends processing.
Extracts token usage from LangChain response objects.
"""
if self.token_process is None:
return
# Handle LangChain response format
if hasattr(response, "llm_output") and isinstance(response.llm_output, dict):
token_usage = response.llm_output.get("token_usage", {})
prompt_tokens = token_usage.get("prompt_tokens", 0)
completion_tokens = token_usage.get("completion_tokens", 0)
self.update_token_usage(prompt_tokens, completion_tokens)
def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
"""Called when LLM errors."""
pass
def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> None:
"""Called when a chain starts."""
pass
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
"""Called when a chain ends."""
pass
def on_chain_error(self, error: BaseException, **kwargs: Any) -> None:
"""Called when a chain errors."""
pass
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> None:
"""Called when a tool starts."""
pass
def on_tool_end(self, output: str, **kwargs: Any) -> None:
"""Called when a tool ends."""
pass
def on_tool_error(self, error: BaseException, **kwargs: Any) -> None:
"""Called when a tool errors."""
pass
def on_text(self, text: str, **kwargs: Any) -> None:
"""Called when text is generated."""
pass
def on_agent_start(self, serialized: Dict[str, Any], **kwargs: Any) -> None:
"""Called when an agent starts."""
pass
def on_agent_end(self, output: Any, **kwargs: Any) -> None:
"""Called when an agent ends."""
pass
def on_agent_error(self, error: BaseException, **kwargs: Any) -> None:
"""Called when an agent errors."""
pass
# For backward compatibility
class TokenCalcHandler(LiteLLMTokenCounter):
"""
Alias for LiteLLMTokenCounter.
"""
pass

View File

@@ -915,8 +915,6 @@ def test_tool_result_as_answer_is_the_final_answer_for_the_agent():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_tool_usage_information_is_appended_to_agent():
from datetime import UTC, datetime
from crewai.tools import BaseTool
class MyCustomTool(BaseTool):
@@ -926,36 +924,30 @@ def test_tool_usage_information_is_appended_to_agent():
def _run(self) -> str:
return "Howdy!"
fixed_datetime = datetime(2025, 2, 10, 12, 0, 0, tzinfo=UTC)
with patch("datetime.datetime") as mock_datetime:
mock_datetime.now.return_value = fixed_datetime
mock_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw)
agent1 = Agent(
role="Friendly Neighbor",
goal="Make everyone feel welcome",
backstory="You are the friendly neighbor",
tools=[MyCustomTool(result_as_answer=True)],
)
agent1 = Agent(
role="Friendly Neighbor",
goal="Make everyone feel welcome",
backstory="You are the friendly neighbor",
tools=[MyCustomTool(result_as_answer=True)],
)
greeting = Task(
description="Say an appropriate greeting.",
expected_output="The greeting.",
agent=agent1,
)
tasks = [greeting]
crew = Crew(agents=[agent1], tasks=tasks)
greeting = Task(
description="Say an appropriate greeting.",
expected_output="The greeting.",
agent=agent1,
)
tasks = [greeting]
crew = Crew(agents=[agent1], tasks=tasks)
crew.kickoff()
assert agent1.tools_results == [
{
"result": "Howdy!",
"tool_name": "Decide Greetings",
"tool_args": {},
"result_as_answer": True,
"start_time": fixed_datetime,
}
]
crew.kickoff()
assert agent1.tools_results == [
{
"result": "Howdy!",
"tool_name": "Decide Greetings",
"tool_args": {},
"result_as_answer": True,
}
]
def test_agent_definition_based_on_dict():

View File

@@ -547,6 +547,7 @@ def test_crew_with_delegating_agents():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_with_delegating_agents_should_not_override_task_tools():
from typing import Type
from pydantic import BaseModel, Field
@@ -833,6 +834,12 @@ def test_crew_verbose_output(capsys):
crew.kickoff()
captured = capsys.readouterr()
# Filter out event listener logs (lines starting with '[')
filtered_output = "\n".join(
line for line in captured.out.split("\n") if not line.startswith("[")
)
expected_strings = [
"\x1b[1m\x1b[95m# Agent:\x1b[00m \x1b[1m\x1b[92mResearcher",
"\x1b[00m\n\x1b[95m## Task:\x1b[00m \x1b[92mResearch AI advancements.",
@@ -845,27 +852,19 @@ def test_crew_verbose_output(capsys):
]
for expected_string in expected_strings:
assert expected_string in captured.out
assert expected_string in filtered_output
# Now test with verbose set to False
crew.verbose = False
crew._logger = Logger(verbose=False)
crew.kickoff()
expected_listener_logs = [
"[🚀 CREW 'CREW' STARTED]",
"[📋 TASK STARTED: RESEARCH AI ADVANCEMENTS.]",
"[🤖 AGENT 'RESEARCHER' STARTED TASK]",
"[✅ AGENT 'RESEARCHER' COMPLETED TASK]",
"[✅ TASK COMPLETED: RESEARCH AI ADVANCEMENTS.]",
"[📋 TASK STARTED: WRITE ABOUT AI IN HEALTHCARE.]",
"[🤖 AGENT 'SENIOR WRITER' STARTED TASK]",
"[✅ AGENT 'SENIOR WRITER' COMPLETED TASK]",
"[✅ TASK COMPLETED: WRITE ABOUT AI IN HEALTHCARE.]",
"[✅ CREW 'CREW' COMPLETED]",
]
captured = capsys.readouterr()
for log in expected_listener_logs:
assert log in captured.out
filtered_output = "\n".join(
line
for line in captured.out.split("\n")
if not line.startswith("[") and line.strip() and not line.startswith("\x1b")
)
assert filtered_output == ""
@pytest.mark.vcr(filter_headers=["authorization"])

View File

@@ -654,3 +654,104 @@ def test_flow_plotting():
assert isinstance(received_events[0], FlowPlotEvent)
assert received_events[0].flow_name == "StatelessFlow"
assert isinstance(received_events[0].timestamp, datetime)
def test_multiple_routers_from_same_trigger():
"""Test that multiple routers triggered by the same method all activate their listeners."""
execution_order = []
class MultiRouterFlow(Flow):
def __init__(self):
super().__init__()
# Set diagnosed conditions to trigger all routers
self.state["diagnosed_conditions"] = "DHA" # Contains D, H, and A
@start()
def scan_medical(self):
execution_order.append("scan_medical")
return "scan_complete"
@router(scan_medical)
def diagnose_conditions(self):
execution_order.append("diagnose_conditions")
return "diagnosis_complete"
@router(diagnose_conditions)
def diabetes_router(self):
execution_order.append("diabetes_router")
if "D" in self.state["diagnosed_conditions"]:
return "diabetes"
return None
@listen("diabetes")
def diabetes_analysis(self):
execution_order.append("diabetes_analysis")
return "diabetes_analysis_complete"
@router(diagnose_conditions)
def hypertension_router(self):
execution_order.append("hypertension_router")
if "H" in self.state["diagnosed_conditions"]:
return "hypertension"
return None
@listen("hypertension")
def hypertension_analysis(self):
execution_order.append("hypertension_analysis")
return "hypertension_analysis_complete"
@router(diagnose_conditions)
def anemia_router(self):
execution_order.append("anemia_router")
if "A" in self.state["diagnosed_conditions"]:
return "anemia"
return None
@listen("anemia")
def anemia_analysis(self):
execution_order.append("anemia_analysis")
return "anemia_analysis_complete"
flow = MultiRouterFlow()
flow.kickoff()
# Verify all methods were called
assert "scan_medical" in execution_order
assert "diagnose_conditions" in execution_order
# Verify all routers were called
assert "diabetes_router" in execution_order
assert "hypertension_router" in execution_order
assert "anemia_router" in execution_order
# Verify all listeners were called - this is the key test for the fix
assert "diabetes_analysis" in execution_order
assert "hypertension_analysis" in execution_order
assert "anemia_analysis" in execution_order
# Verify execution order constraints
assert execution_order.index("diagnose_conditions") > execution_order.index(
"scan_medical"
)
# All routers should execute after diagnose_conditions
assert execution_order.index("diabetes_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("hypertension_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("anemia_router") > execution_order.index(
"diagnose_conditions"
)
# All analyses should execute after their respective routers
assert execution_order.index("diabetes_analysis") > execution_order.index(
"diabetes_router"
)
assert execution_order.index("hypertension_analysis") > execution_order.index(
"hypertension_router"
)
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)

View File

@@ -6,7 +6,7 @@ import pytest
from pydantic import BaseModel
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.llm import LLM
from crewai.llm import CONTEXT_WINDOW_USAGE_RATIO, LLM
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.tool_usage_events import ToolExecutionErrorEvent
from crewai.utilities.token_counter_callback import TokenCalcHandler
@@ -18,15 +18,15 @@ def test_llm_callback_replacement():
llm1 = LLM(model="gpt-4o-mini")
llm2 = LLM(model="gpt-4o-mini")
calc_handler_1 = TokenCalcHandler(token_cost_process=TokenProcess())
calc_handler_2 = TokenCalcHandler(token_cost_process=TokenProcess())
calc_handler_1 = TokenCalcHandler(token_process=TokenProcess())
calc_handler_2 = TokenCalcHandler(token_process=TokenProcess())
result1 = llm1.call(
messages=[{"role": "user", "content": "Hello, world!"}],
callbacks=[calc_handler_1],
)
print("result1:", result1)
usage_metrics_1 = calc_handler_1.token_cost_process.get_summary()
usage_metrics_1 = calc_handler_1.token_process.get_summary()
print("usage_metrics_1:", usage_metrics_1)
result2 = llm2.call(
@@ -35,13 +35,13 @@ def test_llm_callback_replacement():
)
sleep(5)
print("result2:", result2)
usage_metrics_2 = calc_handler_2.token_cost_process.get_summary()
usage_metrics_2 = calc_handler_2.token_process.get_summary()
print("usage_metrics_2:", usage_metrics_2)
# The first handler should not have been updated
assert usage_metrics_1.successful_requests == 1
assert usage_metrics_2.successful_requests == 1
assert usage_metrics_1 == calc_handler_1.token_cost_process.get_summary()
assert usage_metrics_1 == calc_handler_1.token_process.get_summary()
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -57,14 +57,14 @@ def test_llm_call_with_string_input():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_call_with_string_input_and_callbacks():
llm = LLM(model="gpt-4o-mini")
calc_handler = TokenCalcHandler(token_cost_process=TokenProcess())
calc_handler = TokenCalcHandler(token_process=TokenProcess())
# Test the call method with a string input and callbacks
result = llm.call(
"Tell me a joke.",
callbacks=[calc_handler],
)
usage_metrics = calc_handler.token_cost_process.get_summary()
usage_metrics = calc_handler.token_process.get_summary()
assert isinstance(result, str)
assert len(result.strip()) > 0
@@ -286,6 +286,24 @@ def test_o3_mini_reasoning_effort_medium():
assert "Paris" in result
def test_context_window_validation():
"""Test that context window validation works correctly."""
# Test valid window size
llm = LLM(model="o3-mini")
assert llm.get_context_window_size() == int(200000 * CONTEXT_WINDOW_USAGE_RATIO)
# Test invalid window size
with pytest.raises(ValueError) as excinfo:
with patch.dict(
"crewai.llm.LLM_CONTEXT_WINDOW_SIZES",
{"test-model": 500}, # Below minimum
clear=True,
):
llm = LLM(model="test-model")
llm.get_context_window_size()
assert "must be between 1024 and 2097152" in str(excinfo.value)
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.fixture
def anthropic_llm():

View File

@@ -1,360 +0,0 @@
import os
from datetime import UTC, datetime
from unittest.mock import MagicMock, patch
from uuid import UUID
import pytest
from crewai.traces.context import TraceContext
from crewai.traces.enums import CrewType, RunType, TraceType
from crewai.traces.models import (
CrewTrace,
FlowStepIO,
LLMRequest,
LLMResponse,
)
from crewai.traces.unified_trace_controller import (
UnifiedTraceController,
init_crew_main_trace,
init_flow_main_trace,
should_trace,
trace_flow_step,
trace_llm_call,
)
class TestUnifiedTraceController:
@pytest.fixture
def basic_trace_controller(self):
return UnifiedTraceController(
trace_type=TraceType.LLM_CALL,
run_type=RunType.KICKOFF,
crew_type=CrewType.CREW,
run_id="test-run-id",
agent_role="test-agent",
task_name="test-task",
task_description="test description",
task_id="test-task-id",
)
def test_initialization(self, basic_trace_controller):
"""Test basic initialization of UnifiedTraceController"""
assert basic_trace_controller.trace_type == TraceType.LLM_CALL
assert basic_trace_controller.run_type == RunType.KICKOFF
assert basic_trace_controller.crew_type == CrewType.CREW
assert basic_trace_controller.run_id == "test-run-id"
assert basic_trace_controller.agent_role == "test-agent"
assert basic_trace_controller.task_name == "test-task"
assert basic_trace_controller.task_description == "test description"
assert basic_trace_controller.task_id == "test-task-id"
assert basic_trace_controller.status == "running"
assert isinstance(UUID(basic_trace_controller.trace_id), UUID)
def test_start_trace(self, basic_trace_controller):
"""Test starting a trace"""
result = basic_trace_controller.start_trace()
assert result == basic_trace_controller
assert basic_trace_controller.start_time is not None
assert isinstance(basic_trace_controller.start_time, datetime)
def test_end_trace_success(self, basic_trace_controller):
"""Test ending a trace successfully"""
basic_trace_controller.start_trace()
basic_trace_controller.end_trace(result={"test": "result"})
assert basic_trace_controller.end_time is not None
assert basic_trace_controller.status == "completed"
assert basic_trace_controller.error is None
assert basic_trace_controller.context.get("response") == {"test": "result"}
def test_end_trace_with_error(self, basic_trace_controller):
"""Test ending a trace with an error"""
basic_trace_controller.start_trace()
basic_trace_controller.end_trace(error="Test error occurred")
assert basic_trace_controller.end_time is not None
assert basic_trace_controller.status == "error"
assert basic_trace_controller.error == "Test error occurred"
def test_add_child_trace(self, basic_trace_controller):
"""Test adding a child trace"""
child_trace = {"id": "child-1", "type": "test"}
basic_trace_controller.add_child_trace(child_trace)
assert len(basic_trace_controller.children) == 1
assert basic_trace_controller.children[0] == child_trace
def test_to_crew_trace_llm_call(self):
"""Test converting to CrewTrace for LLM call"""
test_messages = [{"role": "user", "content": "test"}]
test_response = {
"content": "test response",
"finish_reason": "stop",
}
controller = UnifiedTraceController(
trace_type=TraceType.LLM_CALL,
run_type=RunType.KICKOFF,
crew_type=CrewType.CREW,
run_id="test-run-id",
context={
"messages": test_messages,
"temperature": 0.7,
"max_tokens": 100,
},
)
# Set model and messages in the context
controller.context["model"] = "gpt-4"
controller.context["messages"] = test_messages
controller.start_trace()
controller.end_trace(result=test_response)
crew_trace = controller.to_crew_trace()
assert isinstance(crew_trace, CrewTrace)
assert isinstance(crew_trace.request, LLMRequest)
assert isinstance(crew_trace.response, LLMResponse)
assert crew_trace.request.model == "gpt-4"
assert crew_trace.request.messages == test_messages
assert crew_trace.response.content == test_response["content"]
assert crew_trace.response.finish_reason == test_response["finish_reason"]
def test_to_crew_trace_flow_step(self):
"""Test converting to CrewTrace for flow step"""
flow_step_data = {
"function_name": "test_function",
"inputs": {"param1": "value1"},
"metadata": {"meta": "data"},
}
controller = UnifiedTraceController(
trace_type=TraceType.FLOW_STEP,
run_type=RunType.KICKOFF,
crew_type=CrewType.FLOW,
run_id="test-run-id",
flow_step=flow_step_data,
)
controller.start_trace()
controller.end_trace(result="test result")
crew_trace = controller.to_crew_trace()
assert isinstance(crew_trace, CrewTrace)
assert isinstance(crew_trace.flow_step, FlowStepIO)
assert crew_trace.flow_step.function_name == "test_function"
assert crew_trace.flow_step.inputs == {"param1": "value1"}
assert crew_trace.flow_step.outputs == {"result": "test result"}
def test_should_trace(self):
"""Test should_trace function"""
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
assert should_trace() is True
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "false"}):
assert should_trace() is False
with patch.dict(os.environ, clear=True):
assert should_trace() is False
@pytest.mark.asyncio
async def test_trace_flow_step_decorator(self):
"""Test trace_flow_step decorator"""
class TestFlow:
flow_id = "test-flow-id"
@trace_flow_step
async def test_method(self, method_name, method, *args, **kwargs):
return "test result"
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
flow = TestFlow()
result = await flow.test_method("test_method", lambda x: x, arg1="value1")
assert result == "test result"
def test_trace_llm_call_decorator(self):
"""Test trace_llm_call decorator"""
class TestLLM:
model = "gpt-4"
temperature = 0.7
max_tokens = 100
stop = None
def _get_execution_context(self):
return MagicMock(), MagicMock()
def _get_new_messages(self, messages):
return messages
def _get_new_tool_results(self, agent):
return []
@trace_llm_call
def test_method(self, params):
return {
"choices": [
{
"message": {"content": "test response"},
"finish_reason": "stop",
}
],
"usage": {
"total_tokens": 50,
"prompt_tokens": 20,
"completion_tokens": 30,
},
}
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
llm = TestLLM()
result = llm.test_method({"messages": []})
assert result["choices"][0]["message"]["content"] == "test response"
def test_init_crew_main_trace_kickoff(self):
"""Test init_crew_main_trace in kickoff mode"""
trace_context = None
class TestCrew:
id = "test-crew-id"
_test = False
_train = False
@init_crew_main_trace
def test_method(self):
nonlocal trace_context
trace_context = TraceContext.get_current()
return "test result"
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
crew = TestCrew()
result = test_method(crew)
assert result == "test result"
assert trace_context is not None
assert trace_context.trace_type == TraceType.LLM_CALL
assert trace_context.run_type == RunType.KICKOFF
assert trace_context.crew_type == CrewType.CREW
assert trace_context.run_id == str(crew.id)
def test_init_crew_main_trace_test_mode(self):
"""Test init_crew_main_trace in test mode"""
trace_context = None
class TestCrew:
id = "test-crew-id"
_test = True
_train = False
@init_crew_main_trace
def test_method(self):
nonlocal trace_context
trace_context = TraceContext.get_current()
return "test result"
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
crew = TestCrew()
result = test_method(crew)
assert result == "test result"
assert trace_context is not None
assert trace_context.run_type == RunType.TEST
def test_init_crew_main_trace_train_mode(self):
"""Test init_crew_main_trace in train mode"""
trace_context = None
class TestCrew:
id = "test-crew-id"
_test = False
_train = True
@init_crew_main_trace
def test_method(self):
nonlocal trace_context
trace_context = TraceContext.get_current()
return "test result"
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
crew = TestCrew()
result = test_method(crew)
assert result == "test result"
assert trace_context is not None
assert trace_context.run_type == RunType.TRAIN
@pytest.mark.asyncio
async def test_init_flow_main_trace(self):
"""Test init_flow_main_trace decorator"""
trace_context = None
test_inputs = {"test": "input"}
class TestFlow:
flow_id = "test-flow-id"
@init_flow_main_trace
async def test_method(self, **kwargs):
nonlocal trace_context
trace_context = TraceContext.get_current()
# Verify the context is set during execution
assert trace_context.context["context"]["inputs"] == test_inputs
return "test result"
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
flow = TestFlow()
result = await flow.test_method(inputs=test_inputs)
assert result == "test result"
assert trace_context is not None
assert trace_context.trace_type == TraceType.FLOW_STEP
assert trace_context.crew_type == CrewType.FLOW
assert trace_context.run_type == RunType.KICKOFF
assert trace_context.run_id == str(flow.flow_id)
assert trace_context.context["context"]["inputs"] == test_inputs
def test_trace_context_management(self):
"""Test TraceContext management"""
trace1 = UnifiedTraceController(
trace_type=TraceType.LLM_CALL,
run_type=RunType.KICKOFF,
crew_type=CrewType.CREW,
run_id="test-run-1",
)
trace2 = UnifiedTraceController(
trace_type=TraceType.FLOW_STEP,
run_type=RunType.TEST,
crew_type=CrewType.FLOW,
run_id="test-run-2",
)
# Test that context is initially empty
assert TraceContext.get_current() is None
# Test setting and getting context
with TraceContext.set_current(trace1):
assert TraceContext.get_current() == trace1
# Test nested context
with TraceContext.set_current(trace2):
assert TraceContext.get_current() == trace2
# Test context restoration after nested block
assert TraceContext.get_current() == trace1
# Test context cleanup after with block
assert TraceContext.get_current() is None
def test_trace_context_error_handling(self):
"""Test TraceContext error handling"""
trace = UnifiedTraceController(
trace_type=TraceType.LLM_CALL,
run_type=RunType.KICKOFF,
crew_type=CrewType.CREW,
run_id="test-run",
)
# Test that context is properly cleaned up even if an error occurs
try:
with TraceContext.set_current(trace):
raise ValueError("Test error")
except ValueError:
pass
assert TraceContext.get_current() is None

View File

@@ -1,14 +1,9 @@
interactions:
- request:
body: '{"model": "llama3.2:3b", "prompt": "### User:\nName: Alice Llama, Age:
30\n\n### System:\nProduce JSON OUTPUT ONLY! Adhere to this format {\"name\":
\"function_name\", \"arguments\":{\"argument_name\": \"argument_value\"}} The
following functions are available to you:\n{''type'': ''function'', ''function'':
{''name'': ''SimpleModel'', ''description'': ''Correctly extracted `SimpleModel`
with all the required parameters with correct types'', ''parameters'': {''properties'':
{''name'': {''title'': ''Name'', ''type'': ''string''}, ''age'': {''title'':
''Age'', ''type'': ''integer''}}, ''required'': [''age'', ''name''], ''type'':
''object''}}}\n\n\n", "options": {}, "stream": false, "format": "json"}'
body: '{"model": "llama3.2:3b", "prompt": "### System:\nPlease convert the following
text into valid JSON.\n\nOutput ONLY the valid JSON and nothing else.\n\nThe
JSON must follow this format exactly:\n{\n \"name\": str,\n \"age\": int\n}\n\n###
User:\nName: Alice Llama, Age: 30\n\n", "options": {"stop": []}, "stream": false}'
headers:
accept:
- '*/*'
@@ -17,23 +12,23 @@ interactions:
connection:
- keep-alive
content-length:
- '657'
- '321'
host:
- localhost:11434
user-agent:
- litellm/1.57.4
- litellm/1.60.2
method: POST
uri: http://localhost:11434/api/generate
response:
content: '{"model":"llama3.2:3b","created_at":"2025-01-15T20:47:11.926411Z","response":"{\"name\":
\"SimpleModel\", \"arguments\":{\"name\": \"Alice Llama\", \"age\": 30}}","done":true,"done_reason":"stop","context":[128006,9125,128007,271,38766,1303,33025,2696,25,6790,220,2366,18,271,128009,128006,882,128007,271,14711,2724,512,678,25,30505,445,81101,11,13381,25,220,966,271,14711,744,512,1360,13677,4823,32090,27785,0,2467,6881,311,420,3645,5324,609,794,330,1723,1292,498,330,16774,23118,14819,1292,794,330,14819,3220,32075,578,2768,5865,527,2561,311,499,512,13922,1337,1232,364,1723,518,364,1723,1232,5473,609,1232,364,16778,1747,518,364,4789,1232,364,34192,398,28532,1595,16778,1747,63,449,682,279,2631,5137,449,4495,4595,518,364,14105,1232,5473,13495,1232,5473,609,1232,5473,2150,1232,364,678,518,364,1337,1232,364,928,25762,364,425,1232,5473,2150,1232,364,17166,518,364,1337,1232,364,11924,8439,2186,364,6413,1232,2570,425,518,364,609,4181,364,1337,1232,364,1735,23742,3818,128009,128006,78191,128007,271,5018,609,794,330,16778,1747,498,330,16774,23118,609,794,330,62786,445,81101,498,330,425,794,220,966,3500],"total_duration":3374470708,"load_duration":1075750500,"prompt_eval_count":167,"prompt_eval_duration":1871000000,"eval_count":24,"eval_duration":426000000}'
content: '{"model":"llama3.2:3b","created_at":"2025-02-21T02:57:55.059392Z","response":"{\"name\":
\"Alice Llama\", \"age\": 30}","done":true,"done_reason":"stop","context":[128006,9125,128007,271,38766,1303,33025,2696,25,6790,220,2366,18,271,128009,128006,882,128007,271,14711,744,512,5618,5625,279,2768,1495,1139,2764,4823,382,5207,27785,279,2764,4823,323,4400,775,382,791,4823,2011,1833,420,3645,7041,512,517,220,330,609,794,610,345,220,330,425,794,528,198,633,14711,2724,512,678,25,30505,445,81101,11,13381,25,220,966,271,128009,128006,78191,128007,271,5018,609,794,330,62786,445,81101,498,330,425,794,220,966,92],"total_duration":4675906000,"load_duration":836091458,"prompt_eval_count":82,"prompt_eval_duration":3561000000,"eval_count":15,"eval_duration":275000000}'
headers:
Content-Length:
- '1263'
- '761'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 15 Jan 2025 20:47:12 GMT
- Fri, 21 Feb 2025 02:57:55 GMT
http_version: HTTP/1.1
status_code: 200
- request:
@@ -52,7 +47,7 @@ interactions:
host:
- localhost:11434
user-agent:
- litellm/1.57.4
- litellm/1.60.2
method: POST
uri: http://localhost:11434/api/show
response:
@@ -228,7 +223,7 @@ interactions:
Reporting violations of the Acceptable Use Policy or unlicensed uses of Llama
3.2: LlamaUseReport@meta.com\",\"modelfile\":\"# Modelfile generated by \\\"ollama
show\\\"\\n# To build a new Modelfile based on this, replace FROM with:\\n#
FROM llama3.2:3b\\n\\nFROM /Users/brandonhancock/.ollama/models/blobs/sha256-dde5aa3fc5ffc17176b5e8bdc82f587b24b2678c6c66101bf7da77af9f7ccdff\\nTEMPLATE
FROM llama3.2:3b\\n\\nFROM /Users/joaomoura/.ollama/models/blobs/sha256-dde5aa3fc5ffc17176b5e8bdc82f587b24b2678c6c66101bf7da77af9f7ccdff\\nTEMPLATE
\\\"\\\"\\\"\\u003c|start_header_id|\\u003esystem\\u003c|end_header_id|\\u003e\\n\\nCutting
Knowledge Date: December 2023\\n\\n{{ if .System }}{{ .System }}\\n{{- end }}\\n{{-
if .Tools }}When you receive a tool call response, use the output to format
@@ -441,12 +436,12 @@ interactions:
.Content }}\\n{{- end }}{{ if not $last }}\\u003c|eot_id|\\u003e{{ end }}\\n{{-
else if eq .Role \\\"tool\\\" }}\\u003c|start_header_id|\\u003eipython\\u003c|end_header_id|\\u003e\\n\\n{{
.Content }}\\u003c|eot_id|\\u003e{{ if $last }}\\u003c|start_header_id|\\u003eassistant\\u003c|end_header_id|\\u003e\\n\\n{{
end }}\\n{{- end }}\\n{{- end }}\",\"details\":{\"parent_model\":\"\",\"format\":\"gguf\",\"family\":\"llama\",\"families\":[\"llama\"],\"parameter_size\":\"3.2B\",\"quantization_level\":\"Q4_K_M\"},\"model_info\":{\"general.architecture\":\"llama\",\"general.basename\":\"Llama-3.2\",\"general.file_type\":15,\"general.finetune\":\"Instruct\",\"general.languages\":[\"en\",\"de\",\"fr\",\"it\",\"pt\",\"hi\",\"es\",\"th\"],\"general.parameter_count\":3212749888,\"general.quantization_version\":2,\"general.size_label\":\"3B\",\"general.tags\":[\"facebook\",\"meta\",\"pytorch\",\"llama\",\"llama-3\",\"text-generation\"],\"general.type\":\"model\",\"llama.attention.head_count\":24,\"llama.attention.head_count_kv\":8,\"llama.attention.key_length\":128,\"llama.attention.layer_norm_rms_epsilon\":0.00001,\"llama.attention.value_length\":128,\"llama.block_count\":28,\"llama.context_length\":131072,\"llama.embedding_length\":3072,\"llama.feed_forward_length\":8192,\"llama.rope.dimension_count\":128,\"llama.rope.freq_base\":500000,\"llama.vocab_size\":128256,\"tokenizer.ggml.bos_token_id\":128000,\"tokenizer.ggml.eos_token_id\":128009,\"tokenizer.ggml.merges\":null,\"tokenizer.ggml.model\":\"gpt2\",\"tokenizer.ggml.pre\":\"llama-bpe\",\"tokenizer.ggml.token_type\":null,\"tokenizer.ggml.tokens\":null},\"modified_at\":\"2024-12-31T11:53:14.529771974-05:00\"}"
end }}\\n{{- end }}\\n{{- end }}\",\"details\":{\"parent_model\":\"\",\"format\":\"gguf\",\"family\":\"llama\",\"families\":[\"llama\"],\"parameter_size\":\"3.2B\",\"quantization_level\":\"Q4_K_M\"},\"model_info\":{\"general.architecture\":\"llama\",\"general.basename\":\"Llama-3.2\",\"general.file_type\":15,\"general.finetune\":\"Instruct\",\"general.languages\":[\"en\",\"de\",\"fr\",\"it\",\"pt\",\"hi\",\"es\",\"th\"],\"general.parameter_count\":3212749888,\"general.quantization_version\":2,\"general.size_label\":\"3B\",\"general.tags\":[\"facebook\",\"meta\",\"pytorch\",\"llama\",\"llama-3\",\"text-generation\"],\"general.type\":\"model\",\"llama.attention.head_count\":24,\"llama.attention.head_count_kv\":8,\"llama.attention.key_length\":128,\"llama.attention.layer_norm_rms_epsilon\":0.00001,\"llama.attention.value_length\":128,\"llama.block_count\":28,\"llama.context_length\":131072,\"llama.embedding_length\":3072,\"llama.feed_forward_length\":8192,\"llama.rope.dimension_count\":128,\"llama.rope.freq_base\":500000,\"llama.vocab_size\":128256,\"tokenizer.ggml.bos_token_id\":128000,\"tokenizer.ggml.eos_token_id\":128009,\"tokenizer.ggml.merges\":null,\"tokenizer.ggml.model\":\"gpt2\",\"tokenizer.ggml.pre\":\"llama-bpe\",\"tokenizer.ggml.token_type\":null,\"tokenizer.ggml.tokens\":null},\"modified_at\":\"2025-02-20T18:55:09.150577031-08:00\"}"
headers:
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 15 Jan 2025 20:47:12 GMT
- Fri, 21 Feb 2025 02:57:55 GMT
Transfer-Encoding:
- chunked
http_version: HTTP/1.1
@@ -467,7 +462,7 @@ interactions:
host:
- localhost:11434
user-agent:
- litellm/1.57.4
- litellm/1.60.2
method: POST
uri: http://localhost:11434/api/show
response:
@@ -643,7 +638,7 @@ interactions:
Reporting violations of the Acceptable Use Policy or unlicensed uses of Llama
3.2: LlamaUseReport@meta.com\",\"modelfile\":\"# Modelfile generated by \\\"ollama
show\\\"\\n# To build a new Modelfile based on this, replace FROM with:\\n#
FROM llama3.2:3b\\n\\nFROM /Users/brandonhancock/.ollama/models/blobs/sha256-dde5aa3fc5ffc17176b5e8bdc82f587b24b2678c6c66101bf7da77af9f7ccdff\\nTEMPLATE
FROM llama3.2:3b\\n\\nFROM /Users/joaomoura/.ollama/models/blobs/sha256-dde5aa3fc5ffc17176b5e8bdc82f587b24b2678c6c66101bf7da77af9f7ccdff\\nTEMPLATE
\\\"\\\"\\\"\\u003c|start_header_id|\\u003esystem\\u003c|end_header_id|\\u003e\\n\\nCutting
Knowledge Date: December 2023\\n\\n{{ if .System }}{{ .System }}\\n{{- end }}\\n{{-
if .Tools }}When you receive a tool call response, use the output to format
@@ -856,12 +851,12 @@ interactions:
.Content }}\\n{{- end }}{{ if not $last }}\\u003c|eot_id|\\u003e{{ end }}\\n{{-
else if eq .Role \\\"tool\\\" }}\\u003c|start_header_id|\\u003eipython\\u003c|end_header_id|\\u003e\\n\\n{{
.Content }}\\u003c|eot_id|\\u003e{{ if $last }}\\u003c|start_header_id|\\u003eassistant\\u003c|end_header_id|\\u003e\\n\\n{{
end }}\\n{{- end }}\\n{{- end }}\",\"details\":{\"parent_model\":\"\",\"format\":\"gguf\",\"family\":\"llama\",\"families\":[\"llama\"],\"parameter_size\":\"3.2B\",\"quantization_level\":\"Q4_K_M\"},\"model_info\":{\"general.architecture\":\"llama\",\"general.basename\":\"Llama-3.2\",\"general.file_type\":15,\"general.finetune\":\"Instruct\",\"general.languages\":[\"en\",\"de\",\"fr\",\"it\",\"pt\",\"hi\",\"es\",\"th\"],\"general.parameter_count\":3212749888,\"general.quantization_version\":2,\"general.size_label\":\"3B\",\"general.tags\":[\"facebook\",\"meta\",\"pytorch\",\"llama\",\"llama-3\",\"text-generation\"],\"general.type\":\"model\",\"llama.attention.head_count\":24,\"llama.attention.head_count_kv\":8,\"llama.attention.key_length\":128,\"llama.attention.layer_norm_rms_epsilon\":0.00001,\"llama.attention.value_length\":128,\"llama.block_count\":28,\"llama.context_length\":131072,\"llama.embedding_length\":3072,\"llama.feed_forward_length\":8192,\"llama.rope.dimension_count\":128,\"llama.rope.freq_base\":500000,\"llama.vocab_size\":128256,\"tokenizer.ggml.bos_token_id\":128000,\"tokenizer.ggml.eos_token_id\":128009,\"tokenizer.ggml.merges\":null,\"tokenizer.ggml.model\":\"gpt2\",\"tokenizer.ggml.pre\":\"llama-bpe\",\"tokenizer.ggml.token_type\":null,\"tokenizer.ggml.tokens\":null},\"modified_at\":\"2024-12-31T11:53:14.529771974-05:00\"}"
end }}\\n{{- end }}\\n{{- end }}\",\"details\":{\"parent_model\":\"\",\"format\":\"gguf\",\"family\":\"llama\",\"families\":[\"llama\"],\"parameter_size\":\"3.2B\",\"quantization_level\":\"Q4_K_M\"},\"model_info\":{\"general.architecture\":\"llama\",\"general.basename\":\"Llama-3.2\",\"general.file_type\":15,\"general.finetune\":\"Instruct\",\"general.languages\":[\"en\",\"de\",\"fr\",\"it\",\"pt\",\"hi\",\"es\",\"th\"],\"general.parameter_count\":3212749888,\"general.quantization_version\":2,\"general.size_label\":\"3B\",\"general.tags\":[\"facebook\",\"meta\",\"pytorch\",\"llama\",\"llama-3\",\"text-generation\"],\"general.type\":\"model\",\"llama.attention.head_count\":24,\"llama.attention.head_count_kv\":8,\"llama.attention.key_length\":128,\"llama.attention.layer_norm_rms_epsilon\":0.00001,\"llama.attention.value_length\":128,\"llama.block_count\":28,\"llama.context_length\":131072,\"llama.embedding_length\":3072,\"llama.feed_forward_length\":8192,\"llama.rope.dimension_count\":128,\"llama.rope.freq_base\":500000,\"llama.vocab_size\":128256,\"tokenizer.ggml.bos_token_id\":128000,\"tokenizer.ggml.eos_token_id\":128009,\"tokenizer.ggml.merges\":null,\"tokenizer.ggml.model\":\"gpt2\",\"tokenizer.ggml.pre\":\"llama-bpe\",\"tokenizer.ggml.token_type\":null,\"tokenizer.ggml.tokens\":null},\"modified_at\":\"2025-02-20T18:55:09.150577031-08:00\"}"
headers:
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 15 Jan 2025 20:47:12 GMT
- Fri, 21 Feb 2025 02:57:55 GMT
Transfer-Encoding:
- chunked
http_version: HTTP/1.1

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,236 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are base_agent. You are
a helpful assistant that just says hi\nYour personal goal is: Just say hi\nTo
give my best complete final answer to the task respond using the exact following
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
answer must be the great and the most complete as possible, it must be outcome
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
"content": "\nCurrent Task: Just say hi\n\nThis is the expected criteria for
your final answer: hi\nyou MUST return the actual complete content as the final
answer, not a summary.\n\nBegin! This is VERY important to you, use the tools
available and give your best Final Answer, your job depends on it!\n\nThought:"}],
"model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '838'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.8
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-B4VsaBZ4ec4b0ab4pkqWgyxTFVVfc\",\n \"object\":
\"chat.completion\",\n \"created\": 1740415556,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: hi\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
161,\n \"completion_tokens\": 12,\n \"total_tokens\": 173,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
headers:
CF-RAY:
- 9170edc5da6f230e-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Mon, 24 Feb 2025 16:45:57 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=lvRw4Nyef7N35to64fj2_kHDfbZp0KSFbwgF5chYMRI-1740415557-1.0.1.1-o5BaN1FpBwv5Wq6zIlv0rCB28lk5hVI9wZQWU3pig1jgyAKDkYzTwZ0MlSR6v6TPIX9RfepjrO3.Gk3FEmcVRw;
path=/; expires=Mon, 24-Feb-25 17:15:57 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=ySaVoTQvAcQyH5QoJQJDj75e5j8HwGFPOlFMAWEvXJk-1740415557302-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '721'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999808'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_fc3b3bcd4382cddaa3c04ce7003e4857
http_version: HTTP/1.1
status_code: 200
- request:
body: '{"messages": [{"role": "system", "content": "You are Task Execution Evaluator.
Evaluator agent for crew evaluation with precise capabilities to evaluate the
performance of the agents in the crew based on the tasks they have performed\nYour
personal goal is: Your goal is to evaluate the performance of the agents in
the crew based on the tasks they have performed using score from 1 to 10 evaluating
on completion, quality, and overall performance.\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"}, {"role": "user", "content": "\nCurrent Task:
Based on the task description and the expected output, compare and evaluate
the performance of the agents in the crew based on the Task Output they have
performed using score from 1 to 10 evaluating on completion, quality, and overall
performance.task_description: Just say hi task_expected_output: hi agent: base_agent
agent_goal: Just say hi Task Output: hi\n\nThis is the expected criteria for
your final answer: Evaluation Score from 1 to 10 based on the performance of
the agents on the tasks\nyou MUST return the actual complete content as the
final answer, not a summary.\nEnsure your final answer contains only the content
in the following format: {\n \"quality\": float\n}\n\nEnsure the final output
does not include any code block markers like ```json or ```python.\n\nBegin!
This is VERY important to you, use the tools available and give your best Final
Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop":
["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '1765'
content-type:
- application/json
cookie:
- __cf_bm=lvRw4Nyef7N35to64fj2_kHDfbZp0KSFbwgF5chYMRI-1740415557-1.0.1.1-o5BaN1FpBwv5Wq6zIlv0rCB28lk5hVI9wZQWU3pig1jgyAKDkYzTwZ0MlSR6v6TPIX9RfepjrO3.Gk3FEmcVRw;
_cfuvid=ySaVoTQvAcQyH5QoJQJDj75e5j8HwGFPOlFMAWEvXJk-1740415557302-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.8
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-B4Vsbd9AsRaJ2exDtWnHAwC8rIjfi\",\n \"object\":
\"chat.completion\",\n \"created\": 1740415557,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: { \\n \\\"quality\\\": 10 \\n} \",\n \"refusal\": null\n
\ },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n
\ ],\n \"usage\": {\n \"prompt_tokens\": 338,\n \"completion_tokens\":
22,\n \"total_tokens\": 360,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n
\ \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
headers:
CF-RAY:
- 9170edd15bb5230e-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Mon, 24 Feb 2025 16:45:58 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '860'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999578'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_fad452c2d10b5fc95809130912b08837
http_version: HTTP/1.1
status_code: 200
version: 1

View File

@@ -0,0 +1,103 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": "Hello, how are you?"}], "model":
"gpt-4o-mini", "stop": []}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '102'
content-type:
- application/json
cookie:
- _cfuvid=IY8ppO70AMHr2skDSUsGh71zqHHdCQCZ3OvkPi26NBc-1740424913267-0.0.1.1-604800000;
__cf_bm=fU6K5KZoDmgcEuF8_yWAYKUO5fKHh6q5.wDPnna393g-1740424913-1.0.1.1-2iOaq3JVGWs439V0HxJee0IC9HdJm7dPkeJorD.AGw0YwkngRPM8rrTzn_7ht1BkbOauEezj.wPKcBz18gIYUg
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.8
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-B4YLA2SrC2rwdVQ3U87G5a0P5lsLw\",\n \"object\":
\"chat.completion\",\n \"created\": 1740425016,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hello! I'm just a computer program, so
I don't have feelings, but I'm here and ready to help you. How can I assist
you today?\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
13,\n \"completion_tokens\": 30,\n \"total_tokens\": 43,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_709714d124\"\n}\n"
headers:
CF-RAY:
- 9171d4c0ed44236e-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Mon, 24 Feb 2025 19:23:38 GMT
Server:
- cloudflare
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '1954'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999978'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_ea2703502b8827e4297cd2a7bae9d9c8
http_version: HTTP/1.1
status_code: 200
version: 1

View File

@@ -0,0 +1,108 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": "Hello, how are you?"}], "model":
"gpt-4o-mini", "stop": []}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '102'
content-type:
- application/json
cookie:
- _cfuvid=GefCcEtb_Gem93E4a9Hvt3Xyof1YQZVJAXBb9I6pEUs-1739398417375-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.8
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-B4YJU8IWKGyBQtAyPDRd3SFI2flYR\",\n \"object\":
\"chat.completion\",\n \"created\": 1740424912,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Hello! I'm just a computer program, so
I don't have feelings, but I'm here and ready to help you. How can I assist
you today?\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
13,\n \"completion_tokens\": 30,\n \"total_tokens\": 43,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
headers:
CF-RAY:
- 9171d230d8ed7ae0-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Mon, 24 Feb 2025 19:21:53 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=fU6K5KZoDmgcEuF8_yWAYKUO5fKHh6q5.wDPnna393g-1740424913-1.0.1.1-2iOaq3JVGWs439V0HxJee0IC9HdJm7dPkeJorD.AGw0YwkngRPM8rrTzn_7ht1BkbOauEezj.wPKcBz18gIYUg;
path=/; expires=Mon, 24-Feb-25 19:51:53 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=IY8ppO70AMHr2skDSUsGh71zqHHdCQCZ3OvkPi26NBc-1740424913267-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '993'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999978'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_d9c4d49185e97b1797061efc1e55d811
http_version: HTTP/1.1
status_code: 200
version: 1

View File

@@ -1,4 +1,5 @@
import json
import os
from typing import Dict, List, Optional
from unittest.mock import MagicMock, Mock, patch
@@ -220,10 +221,13 @@ def test_get_conversion_instructions_gpt():
supports_function_calling.return_value = True
instructions = get_conversion_instructions(SimpleModel, llm)
model_schema = PydanticSchemaParser(model=SimpleModel).get_schema()
assert (
instructions
== f"Please convert the following text into valid JSON.\n\nThe JSON should follow this schema:\n```json\n{model_schema}\n```"
expected_instructions = (
"Please convert the following text into valid JSON.\n\n"
"Output ONLY the valid JSON and nothing else.\n\n"
"The JSON must follow this schema exactly:\n```json\n"
f"{model_schema}\n```"
)
assert instructions == expected_instructions
def test_get_conversion_instructions_non_gpt():
@@ -346,12 +350,17 @@ def test_convert_with_instructions():
assert output.age == 30
@pytest.mark.vcr(filter_headers=["authorization"])
# Skip tests that call external APIs when running in CI/CD
skip_external_api = pytest.mark.skipif(
os.getenv("CI") is not None, reason="Skipping tests that call external API in CI/CD"
)
@skip_external_api
@pytest.mark.vcr(filter_headers=["authorization"], record_mode="once")
def test_converter_with_llama3_2_model():
llm = LLM(model="ollama/llama3.2:3b", base_url="http://localhost:11434")
sample_text = "Name: Alice Llama, Age: 30"
instructions = get_conversion_instructions(SimpleModel, llm)
converter = Converter(
llm=llm,
@@ -359,19 +368,17 @@ def test_converter_with_llama3_2_model():
model=SimpleModel,
instructions=instructions,
)
output = converter.to_pydantic()
assert isinstance(output, SimpleModel)
assert output.name == "Alice Llama"
assert output.age == 30
@pytest.mark.vcr(filter_headers=["authorization"])
@skip_external_api
@pytest.mark.vcr(filter_headers=["authorization"], record_mode="once")
def test_converter_with_llama3_1_model():
llm = LLM(model="ollama/llama3.1", base_url="http://localhost:11434")
sample_text = "Name: Alice Llama, Age: 30"
instructions = get_conversion_instructions(SimpleModel, llm)
converter = Converter(
llm=llm,
@@ -379,14 +386,19 @@ def test_converter_with_llama3_1_model():
model=SimpleModel,
instructions=instructions,
)
output = converter.to_pydantic()
assert isinstance(output, SimpleModel)
assert output.name == "Alice Llama"
assert output.age == 30
# Skip tests that call external APIs when running in CI/CD
skip_external_api = pytest.mark.skipif(
os.getenv("CI") is not None, reason="Skipping tests that call external API in CI/CD"
)
@skip_external_api
@pytest.mark.vcr(filter_headers=["authorization"])
def test_converter_with_nested_model():
llm = LLM(model="gpt-4o-mini")
@@ -563,7 +575,7 @@ def test_converter_with_ambiguous_input():
with pytest.raises(ConverterError) as exc_info:
output = converter.to_pydantic()
assert "validation error" in str(exc_info.value).lower()
assert "failed to convert text into a pydantic model" in str(exc_info.value).lower()
# Tests for function calling support

View File

@@ -1,6 +1,5 @@
import json
from datetime import datetime
from unittest.mock import MagicMock, patch
from unittest.mock import Mock, patch
import pytest
from pydantic import Field
@@ -9,9 +8,9 @@ from crewai.agent import Agent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.crew import Crew
from crewai.flow.flow import Flow, listen, start
from crewai.llm import LLM
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.tools.tool_usage import ToolUsage
from crewai.utilities.events.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
@@ -21,8 +20,11 @@ from crewai.utilities.events.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.event_listener import EventListener
from crewai.utilities.events.event_types import ToolUsageFinishedEvent
from crewai.utilities.events.flow_events import (
FlowCreatedEvent,
@@ -31,6 +33,12 @@ from crewai.utilities.events.flow_events import (
MethodExecutionFailedEvent,
MethodExecutionStartedEvent,
)
from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
)
from crewai.utilities.events.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
@@ -52,26 +60,35 @@ base_task = Task(
expected_output="hi",
agent=base_agent,
)
event_listener = EventListener()
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_start_kickoff_event():
received_events = []
mock_span = Mock()
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(CrewKickoffStartedEvent)
def handle_crew_start(source, event):
received_events.append(event)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
@crewai_event_bus.on(CrewKickoffStartedEvent)
def handle_crew_start(source, event):
received_events.append(event)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
with (
patch.object(
event_listener._telemetry, "crew_execution_span", return_value=mock_span
) as mock_crew_execution_span,
patch.object(
event_listener._telemetry, "end_crew", return_value=mock_span
) as mock_crew_ended,
):
crew.kickoff()
mock_crew_execution_span.assert_called_once_with(crew, None)
mock_crew_ended.assert_called_once_with(crew, "hi")
assert len(received_events) == 1
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_started"
assert len(received_events) == 1
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_started"
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -92,6 +109,45 @@ def test_crew_emits_end_kickoff_event():
assert received_events[0].type == "crew_kickoff_completed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_test_kickoff_type_event():
received_events = []
mock_span = Mock()
@crewai_event_bus.on(CrewTestStartedEvent)
def handle_crew_end(source, event):
received_events.append(event)
@crewai_event_bus.on(CrewTestCompletedEvent)
def handle_crew_test_end(source, event):
received_events.append(event)
eval_llm = LLM(model="gpt-4o-mini")
with (
patch.object(
event_listener._telemetry, "test_execution_span", return_value=mock_span
) as mock_crew_execution_span,
):
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.test(n_iterations=1, eval_llm=eval_llm)
# Verify the call was made with correct argument types and values
assert mock_crew_execution_span.call_count == 1
args = mock_crew_execution_span.call_args[0]
assert isinstance(args[0], Crew)
assert args[1] == 1
assert args[2] is None
assert args[3] == eval_llm
assert len(received_events) == 2
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_test_started"
assert received_events[1].crew_name == "TestCrew"
assert isinstance(received_events[1].timestamp, datetime)
assert received_events[1].type == "crew_test_completed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_emits_kickoff_failed_event():
received_events = []
@@ -142,9 +198,20 @@ def test_crew_emits_end_task_event():
def handle_task_end(source, event):
received_events.append(event)
mock_span = Mock()
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
with (
patch.object(
event_listener._telemetry, "task_started", return_value=mock_span
) as mock_task_started,
patch.object(
event_listener._telemetry, "task_ended", return_value=mock_span
) as mock_task_ended,
):
crew.kickoff()
crew.kickoff()
mock_task_started.assert_called_once_with(crew=crew, task=base_task)
mock_task_ended.assert_called_once_with(mock_span, base_task, crew)
assert len(received_events) == 1
assert isinstance(received_events[0].timestamp, datetime)
@@ -334,24 +401,29 @@ def test_tools_emits_error_events():
def test_flow_emits_start_event():
received_events = []
mock_span = Mock()
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "started"
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "started"
with (
patch.object(
event_listener._telemetry, "flow_execution_span", return_value=mock_span
) as mock_flow_execution_span,
):
flow = TestFlow()
flow.kickoff()
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "flow_started"
mock_flow_execution_span.assert_called_once_with("TestFlow", ["begin"])
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "flow_started"
def test_flow_emits_finish_event():
@@ -455,6 +527,7 @@ def test_multiple_handlers_for_same_event():
def test_flow_emits_created_event():
received_events = []
mock_span = Mock()
@crewai_event_bus.on(FlowCreatedEvent)
def handle_flow_created(source, event):
@@ -465,8 +538,15 @@ def test_flow_emits_created_event():
def begin(self):
return "started"
flow = TestFlow()
flow.kickoff()
with (
patch.object(
event_listener._telemetry, "flow_creation_span", return_value=mock_span
) as mock_flow_creation_span,
):
flow = TestFlow()
flow.kickoff()
mock_flow_creation_span.assert_called_once_with("TestFlow")
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
@@ -495,3 +575,43 @@ def test_flow_emits_method_execution_failed_event():
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "method_execution_failed"
assert received_events[0].error == error
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_emits_call_started_event():
received_events = []
@crewai_event_bus.on(LLMCallStartedEvent)
def handle_llm_call_started(source, event):
received_events.append(event)
@crewai_event_bus.on(LLMCallCompletedEvent)
def handle_llm_call_completed(source, event):
received_events.append(event)
llm = LLM(model="gpt-4o-mini")
llm.call("Hello, how are you?")
assert len(received_events) == 2
assert received_events[0].type == "llm_call_started"
assert received_events[1].type == "llm_call_completed"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_emits_call_failed_event():
received_events = []
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_call_failed(source, event):
received_events.append(event)
error_message = "Simulated LLM call failure"
with patch("crewai.llm.litellm.completion", side_effect=Exception(error_message)):
llm = LLM(model="gpt-4o-mini")
with pytest.raises(Exception) as exc_info:
llm.call("Hello, how are you?")
assert str(exc_info.value) == error_message
assert len(received_events) == 1
assert received_events[0].type == "llm_call_failed"
assert received_events[0].error == error_message

View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python
"""
Test module for token tracking functionality in CrewAI.
This tests both direct LangChain models and LiteLLM integration.
"""
import os
from typing import Any, Dict
from unittest.mock import MagicMock, patch
import pytest
from langchain_core.tools import Tool
from langchain_openai import ChatOpenAI
from crewai import Crew, Process, Task
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.agents.langchain_agent_adapter import LangChainAgentAdapter
from crewai.utilities.token_counter_callback import (
LangChainTokenCounter,
LiteLLMTokenCounter,
)
def get_weather(location: str = "San Francisco"):
"""Simulates fetching current weather data for a given location."""
# In a real implementation, you could replace this with an API call.
return f"Current weather in {location}: Sunny, 25°C"
class TestTokenTracking:
"""Test suite for token tracking functionality."""
@pytest.fixture
def weather_tool(self):
"""Create a simple weather tool for testing."""
return Tool(
name="Weather",
func=get_weather,
description="Useful for fetching current weather information for a given location.",
)
@pytest.fixture
def mock_openai_response(self):
"""Create a mock OpenAI response with token usage information."""
return {
"usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
"total_tokens": 150,
}
}
def test_token_process_basic(self):
"""Test basic functionality of TokenProcess class."""
token_process = TokenProcess()
# Test adding prompt tokens
token_process.sum_prompt_tokens(100)
assert token_process.prompt_tokens == 100
# Test adding completion tokens
token_process.sum_completion_tokens(50)
assert token_process.completion_tokens == 50
# Test adding successful requests
token_process.sum_successful_requests(1)
assert token_process.successful_requests == 1
# Test getting summary
summary = token_process.get_summary()
assert summary.prompt_tokens == 100
assert summary.completion_tokens == 50
assert summary.total_tokens == 150
assert summary.successful_requests == 1
@patch("litellm.completion")
def test_litellm_token_counter(self, mock_completion):
"""Test LiteLLMTokenCounter with a mock response."""
# Setup
token_process = TokenProcess()
counter = LiteLLMTokenCounter(token_process)
# Mock the response
mock_completion.return_value = {
"usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
}
}
# Simulate a successful LLM call
counter.log_success_event(
kwargs={},
response_obj=mock_completion.return_value,
start_time=0,
end_time=1,
)
# Verify token counts were updated
assert token_process.prompt_tokens == 100
assert token_process.completion_tokens == 50
assert token_process.successful_requests == 1
def test_langchain_token_counter(self):
"""Test LangChainTokenCounter with a mock response."""
# Setup
token_process = TokenProcess()
counter = LangChainTokenCounter(token_process)
# Create a mock LangChain response
mock_response = MagicMock()
mock_response.llm_output = {
"token_usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
}
}
# Simulate a successful LLM call
counter.on_llm_end(mock_response)
# Verify token counts were updated
assert token_process.prompt_tokens == 100
assert token_process.completion_tokens == 50
assert token_process.successful_requests == 1
@pytest.mark.skipif(
not os.environ.get("OPENAI_API_KEY"),
reason="OPENAI_API_KEY environment variable not set",
)
def test_langchain_agent_adapter_token_tracking(self, weather_tool):
"""
Integration test for token tracking with LangChainAgentAdapter.
This test requires an OpenAI API key.
"""
# Skip if LangGraph is not installed
try:
from langgraph.prebuilt import ToolNode
except ImportError:
pytest.skip("LangGraph is not installed. Install it with: uv add langgraph")
# Initialize a ChatOpenAI model
llm = ChatOpenAI(model="gpt-4o")
# Create a LangChainAgentAdapter with the direct LLM
agent = LangChainAgentAdapter(
langchain_agent=llm,
tools=[weather_tool],
role="Weather Agent",
goal="Provide current weather information for the requested location.",
backstory="An expert weather provider that fetches current weather information using simulated data.",
verbose=True,
)
# Create a weather task for the agent
task = Task(
description="Fetch the current weather for San Francisco.",
expected_output="A weather report showing current conditions in San Francisco.",
agent=agent,
)
# Create a crew with the single agent and task
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
process=Process.sequential,
)
# Execute the crew
result = crew.kickoff()
# Verify token usage was tracked
assert result.token_usage is not None
assert result.token_usage.total_tokens > 0
assert result.token_usage.prompt_tokens > 0
assert result.token_usage.completion_tokens > 0
assert result.token_usage.successful_requests > 0
# Also verify token usage directly from the agent
usage = agent.token_process.get_summary()
assert usage.prompt_tokens > 0
assert usage.completion_tokens > 0
assert usage.total_tokens > 0
assert usage.successful_requests > 0
if __name__ == "__main__":
pytest.main(["-xvs", __file__])