Compare commits

...

52 Commits

Author SHA1 Message Date
Brandon Hancock
6efee89399 refactor 2025-02-28 12:30:30 -05:00
Brandon Hancock
75d8e086a4 Refactor token tracking: Remove token_cost_process parameter for cleaner code 2025-02-28 12:17:14 -05:00
Brandon Hancock
ef48cbe971 Fix token tracking in Agent class to use token_process instead of _token_process 2025-02-28 11:44:51 -05:00
Brandon Hancock
88d8079dcd Fix token tracking in LangChainAgentAdapter and refactor token_process attribute to be public 2025-02-28 11:31:08 -05:00
Brandon Hancock
33ef612cd5 Merge branch 'main' into brandon/bring-back-byoa 2025-02-28 09:26:34 -05:00
Brandon Hancock (bhancock_ai)
6c81acac00 Update docs (#2226) 2025-02-27 13:20:44 -05:00
Brandon Hancock (bhancock_ai)
2181166a62 Improve extract thought (#2223)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:44 -05:00
Brandon Hancock (bhancock_ai)
3f17789152 Support multiple router calls and address issue #2175 (#2231)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:44 -05:00
Brandon Hancock (bhancock_ai)
ed0b8e1563 Fix type issue (#2224)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:44 -05:00
Brandon Hancock (bhancock_ai)
4915420ed2 Add support for python 3.10 (#2230) 2025-02-27 13:20:44 -05:00
Lorenze Jay
271faa917b feat: Enhance agent knowledge setup with optional crew embedder (#2232)
- Modify `Agent` class to add `set_knowledge` method
- Allow setting embedder from crew-level configuration
- Remove `_set_knowledge` method from initialization
- Update `Crew` class to set agent knowledge during agent setup
- Add default implementation in `BaseAgent` for compatibility
2025-02-27 13:20:42 -05:00
Fernando Galves
e87eb57edc Update the constants.py file adding the list of foundation models available in Amazon Bedrock (#2170)
* Update constants.py

This PR updates the list of foundation models available in Amazon Bedrock to reflect the latest offerings.

* Update constants.py with inference profiles

Add the cross-region inference profiles to increase throughput and improve resiliency by routing your requests across multiple AWS Regions during peak utilization bursts.

* Update constants.py

Fix the model order

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
devin-ai-integration[bot]
e6e4bb15d7 feat: add context window size for o3-mini model (#2192)
* feat: add context window size for o3-mini model

Fixes #2191

Co-Authored-By: Joe Moura <joao@crewai.com>

* feat: add context window validation and tests

- Add validation for context window size bounds (1024-2097152)
- Add test for context window validation
- Fix test import error

Co-Authored-By: Joe Moura <joao@crewai.com>

* style: fix import sorting in llm_test.py

Co-Authored-By: Joe Moura <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
92a3349d64 incorporating fix from @misrasaurabh1 with additional type fix (#2213)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Shivtej Narake
940f0647a9 [MINOR]support ChatOllama from langchain_ollama (#2158)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Vidit Ostwal
5db512bef6 Fixed the issue 2123 around memory command with CLI (#2155)
* Fixed the issue 2123 around memory command with CLI

* Fixed typo, added the recommendations

* Fixed Typo

* Fixed lint issue

* Fixed the print statement to include path as well

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Victor Degliame
48d2b8c320 fix: typo in 'delegate_work' and 'ask_question' promps (#2144)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
nikolaidk
f6b0b492a4 Update kickoff-async.mdx (#2138)
Missing mandatory field expected_output on task in example

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
4b63b29787 Revert "feat: add prompt observability code (#2027)" (#2211)
* Revert "feat: add prompt observability code (#2027)"

This reverts commit 90f1bee602.

* Fix issues with flows post merge

* Decoupling telemetry and ensure tests  (#2212)

* feat: Enhance event listener and telemetry tracking

- Update event listener to improve telemetry span handling
- Add execution_span field to Task for better tracing
- Modify event handling in EventListener to use new span tracking
- Remove debug print statements
- Improve test coverage for crew and flow events
- Update cassettes to reflect new event tracking behavior

* Remove telemetry references from Crew class

- Remove Telemetry import and initialization from Crew class
- Delete _telemetry attribute from class configuration
- Clean up unused telemetry-related code

* test: Improve crew verbose output test with event log filtering

- Filter out event listener logs in verbose output test
- Ensure no output when verbose is set to False
- Enhance test coverage for crew logging behavior

* dropped comment

* refactor: Improve telemetry span tracking in EventListener

- Remove `execution_span` from Task class
- Add `execution_spans` dictionary to EventListener to track spans
- Update task event handlers to use new span tracking mechanism
- Simplify span management across task lifecycle events

* lint

* Fix failing test

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Lorenze Jay
860efc3b42 Decoupling telemetry and ensure tests (#2212)
* feat: Enhance event listener and telemetry tracking

- Update event listener to improve telemetry span handling
- Add execution_span field to Task for better tracing
- Modify event handling in EventListener to use new span tracking
- Remove debug print statements
- Improve test coverage for crew and flow events
- Update cassettes to reflect new event tracking behavior

* Remove telemetry references from Crew class

- Remove Telemetry import and initialization from Crew class
- Delete _telemetry attribute from class configuration
- Clean up unused telemetry-related code

* test: Improve crew verbose output test with event log filtering

- Filter out event listener logs in verbose output test
- Ensure no output when verbose is set to False
- Enhance test coverage for crew logging behavior

* dropped comment

* refactor: Improve telemetry span tracking in EventListener

- Remove `execution_span` from Task class
- Add `execution_spans` dictionary to EventListener to track spans
- Update task event handlers to use new span tracking mechanism
- Simplify span management across task lifecycle events

* lint
2025-02-27 13:20:01 -05:00
Lorenze Jay
70ab4ad003 feat: Add LLM call events for improved observability (#2214)
* feat: Add LLM call events for improved observability

- Introduce new LLM call events: LLMCallStartedEvent, LLMCallCompletedEvent, and LLMCallFailedEvent
- Emit events for LLM calls and tool calls to provide better tracking and debugging
- Add event handling in the LLM class to track call lifecycle
- Update event bus to support new LLM-related events
- Add test cases to validate LLM event emissions

* feat: Add event handling for LLM call lifecycle events

- Implement event listeners for LLM call events in EventListener
- Add logging for LLM call start, completion, and failure events
- Import and register new LLM-specific event types

* less log

* refactor: Update LLM event response type to support Any

* refactor: Simplify LLM call completed event emission

Remove unnecessary LLMCallType conversion when emitting LLMCallCompletedEvent

* refactor: Update LLM event docstrings for clarity

Improve docstrings for LLM call events to more accurately describe their purpose and lifecycle

* feat: Add LLMCallFailedEvent emission for tool execution errors

Enhance error handling by emitting a specific event when tool execution fails during LLM calls
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
6fb25a1af7 fix reset memory issue (#2182)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
ca9277ae4c Better support async flows (#2193)
* Better support async

* Drop coroutine
2025-02-27 13:20:01 -05:00
Jannik Maierhöfer
e58e544304 docs: add header image to langfuse guide (#2128)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
João Moura
84e0a9e686 cassetes 2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
6e0e9b30fe drop prints (#2181) 2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
f6393fd088 Check the right property for tool calling (#2160)
* Check the right property

* Fix failing tests

* Update cassettes

* Update cassettes again

* Update cassettes again 2

* Update cassettes again 3

* fix other test that fails in ci/cd

* Fix issues pointed out by lorenze
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
64804682fc imporve HITL (#2169)
* imporve HITL

* fix failing test

* fix failing test part 2

* Drop extra logs that were causing confusion

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Lorenze Jay
cc7669ab39 WIP crew events emitter (#2048)
* WIP crew events emitter

* Refactor event handling and introduce new event types

- Migrate from global `emit` function to `event_bus.emit`
- Add new event types for task failures, tool usage, and agent execution
- Update event listeners and event bus to support more granular event tracking
- Remove deprecated event emission methods
- Improve event type consistency and add more detailed event information

* Add event emission for agent execution lifecycle

- Emit AgentExecutionStarted and AgentExecutionError events
- Update CrewAgentExecutor to use event_bus for tracking agent execution
- Refactor error handling to include event emission
- Minor code formatting improvements in task.py and crew_agent_executor.py
- Fix a typo in test file

* Refactor event system and add third-party event listeners

- Move event_bus import to correct module paths
- Introduce BaseEventListener abstract base class
- Add AgentOpsListener for third-party event tracking
- Update event listener initialization and setup
- Clean up event-related imports and exports

* Enhance event system type safety and error handling

- Improve type annotations for event bus and event types
- Add null checks for agent and task in event emissions
- Update import paths for base tool and base agent
- Refactor event listener type hints
- Remove unnecessary print statements
- Update test configurations to match new event handling

* Refactor event classes to improve type safety and naming consistency

- Rename event classes to have explicit 'Event' suffix (e.g., TaskStartedEvent)
- Update import statements and references across multiple files
- Remove deprecated events.py module
- Enhance event type hints and configurations
- Clean up unnecessary event-related code

* Add default model for CrewEvaluator and fix event import order

- Set default model to "gpt-4o-mini" in CrewEvaluator when no model is specified
- Reorder event-related imports in task.py to follow standard import conventions
- Update event bus initialization method return type hint
- Export event_bus in events/__init__.py

* Fix tool usage and event import handling

- Update tool usage to use `.get()` method when checking tool name
- Remove unnecessary `__all__` export list in events/__init__.py

* Refactor Flow and Agent event handling to use event_bus

- Remove `event_emitter` from Flow class and replace with `event_bus.emit()`
- Update Flow and Agent tests to use event_bus event listeners
- Remove redundant event emissions in Flow methods
- Add debug print statements in Flow execution
- Simplify event tracking in test cases

* Enhance event handling for Crew, Task, and Event classes

- Add crew name to failed event types (CrewKickoffFailedEvent, CrewTrainFailedEvent, CrewTestFailedEvent)
- Update Task events to remove redundant task and context attributes
- Refactor EventListener to use Logger for consistent event logging
- Add new event types for Crew train and test events
- Improve event bus event tracking in test cases

* Remove telemetry and tracing dependencies from Task and Flow classes

- Remove telemetry-related imports and private attributes from Task class
- Remove `_telemetry` attribute from Flow class
- Update event handling to emit events without direct telemetry tracking
- Simplify task and flow execution by removing explicit telemetry spans
- Move telemetry-related event handling to EventListener

* Clean up unused imports and event-related code

- Remove unused imports from various event and flow-related files
- Reorder event imports to follow standard conventions
- Remove unnecessary event type references
- Simplify import statements in event and flow modules

* Update crew test to validate verbose output and kickoff_for_each method

- Enhance test_crew_verbose_output to check specific listener log messages
- Modify test_kickoff_for_each_invalid_input to use Pydantic validation error
- Improve test coverage for crew logging and input validation

* Update crew test verbose output with improved emoji icons

- Replace task and agent completion icons from 👍 to 
- Enhance readability of test output logging
- Maintain consistent test coverage for crew verbose output

* Add MethodExecutionFailedEvent to handle flow method execution failures

- Introduce new MethodExecutionFailedEvent in flow_events module
- Update Flow class to catch and emit method execution failures
- Add event listener for method execution failure events
- Update event-related imports to include new event type
- Enhance test coverage for method execution failure handling

* Propagate method execution failures in Flow class

- Modify Flow class to re-raise exceptions after emitting MethodExecutionFailedEvent
- Reorder MethodExecutionFailedEvent import to maintain consistent import style

* Enable test coverage for Flow method execution failure event

- Uncomment pytest.raises() in test_events to verify exception handling
- Ensure test validates MethodExecutionFailedEvent emission during flow kickoff

* Add event handling for tool usage events

- Introduce event listeners for ToolUsageFinishedEvent and ToolUsageErrorEvent
- Log tool usage events with descriptive emoji icons ( and )
- Update event_listener to track and log tool usage lifecycle

* Reorder and clean up event imports in event_listener

- Reorganize imports for tool usage events and other event types
- Maintain consistent import ordering and remove unused imports
- Ensure clean and organized import structure in event_listener module

* moving to dedicated eventlistener

* dont forget crew level

* Refactor AgentOps event listener for crew-level tracking

- Modify AgentOpsListener to handle crew-level events
- Initialize and end AgentOps session at crew kickoff and completion
- Create agents for each crew member during session initialization
- Improve session management and event recording
- Clean up and simplify event handling logic

* Update test_events to validate tool usage error event handling

- Modify test to assert single error event with correct attributes
- Use pytest.raises() to verify error event generation
- Simplify error event validation in test case

* Improve AgentOps listener type hints and formatting

- Add string type hints for AgentOps classes to resolve potential import issues
- Clean up unnecessary whitespace and improve code indentation
- Simplify initialization and event handling logic

* Update test_events to validate multiple tool usage events

- Modify test to assert 75 events instead of a single error event
- Remove pytest.raises() check, allowing crew kickoff to complete
- Adjust event validation to support broader event tracking

* Rename event_bus to crewai_event_bus for improved clarity and specificity

- Replace all references to `event_bus` with `crewai_event_bus`
- Update import statements across multiple files
- Remove the old `event_bus.py` file
- Maintain existing event handling functionality

* Enhance EventListener with singleton pattern and color configuration

- Implement singleton pattern for EventListener to ensure single instance
- Add default color configuration using EMITTER_COLOR from constants
- Modify log method calls to use default color and remove redundant color parameters
- Improve initialization logic to prevent multiple initializations

* Add FlowPlotEvent and update event bus to support flow plotting

- Introduce FlowPlotEvent to track flow plotting events
- Replace Telemetry method with event bus emission in Flow.plot()
- Update event bus to support new FlowPlotEvent type
- Add test case to validate flow plotting event emission

* Remove RunType enum and clean up crew events module

- Delete unused RunType enum from crew_events.py
- Simplify crew_events.py by removing unnecessary enum definition
- Improve code clarity by removing unneeded imports

* Enhance event handling for tool usage and agent execution

- Add new events for tool usage: ToolSelectionErrorEvent, ToolValidateInputErrorEvent
- Improve error tracking and event emission in ToolUsage and LLM classes
- Update AgentExecutionStartedEvent to use task_prompt instead of inputs
- Add comprehensive test coverage for new event types and error scenarios

* Refactor event system and improve crew testing

- Extract base CrewEvent class to a new base_events.py module
- Update event imports across multiple event-related files
- Modify CrewTestStartedEvent to use eval_llm instead of openai_model_name
- Add LLM creation validation in crew testing method
- Improve type handling and event consistency

* Refactor task events to use base CrewEvent

- Move CrewEvent import from crew_events to base_events
- Remove unnecessary blank lines in task_events.py
- Simplify event class structure for task-related events

* Update AgentExecutionStartedEvent to use task_prompt

- Modify test_events.py to use task_prompt instead of inputs
- Simplify event input validation in test case
- Align with recent event system refactoring

* Improve type hinting for TaskCompletedEvent handler

- Add explicit type annotation for TaskCompletedEvent in event_listener.py
- Enhance type safety for event handling in EventListener

* Improve test_validate_tool_input_invalid_input with mock objects

- Add explicit mock objects for agent and action in test case
- Ensure proper string values for mock agent and action attributes
- Simplify test setup for ToolUsage validation method

* Remove ToolUsageStartedEvent emission in tool usage process

- Remove unnecessary event emission for tool usage start
- Simplify tool usage event handling
- Eliminate redundant event data preparation step

* refactor: clean up and organize imports in llm and flow modules

* test: Improve flow persistence test cases and logging
2025-02-27 13:20:01 -05:00
João Moura
377b64ac81 making flow verbsoe false by default 2025-02-27 13:20:01 -05:00
Tony Kipkemboi
470254c3e2 docs: update accordions and fix layout (#2110)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Vini Brasil
497190f823 Implement flow.state_utils.to_string method and improve types (#2161) 2025-02-27 13:20:01 -05:00
Eduardo Chiarotti
241adb8ed0 feat: add prompt observability code (#2027)
* feat: add prompt observability code

* feat: improve logic for llm call

* feat: add tests for traces

* feat: remove unused improt

* feat: add function to clear and add task traces

* feat: fix import

* feat:  chagne time

* feat: fix type checking issues

* feat: add fixed time to fix test

* feat: fix datetime test issue

* feat: add add task traces function

* feat: add same logic as entp

* feat: add start_time as reference for duplication of tool call

* feat: add max_depth

* feat: add protocols file to properly import on LLM

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
e60c6e66a4 Bugfix/fix backtick in agent response (#2159)
* updating prompts

* fix issue

* clean up thoughts as well

* drop trailing set
2025-02-27 13:20:01 -05:00
sharmasundip
afd01e3c0c fix user memory config issue (#2086)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Vidit Ostwal
81b8ae0abd Added functionality to have any llm run test functionality (#2071)
* Added functionality to have any llm run test functionality

* Fixed lint issues

* Fixed Linting issues

* Fixed unit test case

* Fixed unit test

* Fixed test case

* Fixed unit test case

---------

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-27 13:20:01 -05:00
Vini Brasil
ae19437473 Implement Flow state export method (#2134)
This commit implements a method for exporting the state of a flow into a
JSON-serializable dictionary.

The idea is producing a human-readable version of state that can be
inspected or consumed by other systems, hence JSON and not pickling or
marshalling.

I consider it an export because it's a one-way process, meaning it
cannot be loaded back into Python because of complex types.
2025-02-27 13:20:01 -05:00
Brandon Hancock (bhancock_ai)
f8c74b4fbb Merge branch 'main' into brandon/bring-back-byoa 2025-02-14 15:57:16 -05:00
Brandon Hancock
134e7ab241 clean up RPM controller 2025-02-14 15:32:34 -05:00
Brandon Hancock
74e63621a5 Fix new errors 2025-02-14 15:29:20 -05:00
Brandon Hancock
8953af6133 Fix ruff issues 2025-02-14 15:24:29 -05:00
Brandon Hancock
2b438baad4 Fix issues 2025-02-14 15:11:58 -05:00
Brandon Hancock
185556b7e3 WIP 2025-02-14 11:40:23 -05:00
Brandon Hancock
1e23d37a14 more fixes 2025-02-13 15:11:51 -05:00
Brandon Hancock
df21f01441 more fixes 2025-02-13 15:08:29 -05:00
Brandon Hancock
b957fc1a18 More type fixues 2025-02-13 15:06:16 -05:00
Brandon Hancock
fd0e1bdd1a Fix more type issues 2025-02-13 15:03:17 -05:00
Brandon Hancock
265b37316b fix type issues 2025-02-13 15:00:27 -05:00
Brandon Hancock
ff32880a54 clean up 2025-02-13 14:55:11 -05:00
Brandon Hancock
a38483e1b4 It works! 2025-02-12 17:28:35 -05:00
Brandon Hancock
7910dc9337 wip 2025-02-12 11:10:00 -05:00
Brandon Hancock
796e50aba8 WIP 2025-02-10 16:11:29 -05:00
10 changed files with 989 additions and 128 deletions

View File

@@ -1,7 +1,7 @@
import re
import shutil
import subprocess
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from typing import Any, Dict, List, Literal, Optional, Sequence, Union, cast
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -170,27 +170,19 @@ class Agent(BaseAgent):
Output of the agent
"""
if self.tools_handler:
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalli
task_prompt = task.prompt()
# If the task requires output in JSON or Pydantic format,
# append specific instructions to the task prompt to ensure
# that the final answer does not include any code block markers
if task.output_json or task.output_pydantic:
# Generate the schema based on the output format
if task.output_json:
# schema = json.dumps(task.output_json, indent=2)
schema = generate_model_description(task.output_json)
task_prompt += "\n" + self.i18n.slice(
"formatted_task_instructions"
).format(output_format=schema)
elif task.output_pydantic:
schema = generate_model_description(task.output_pydantic)
task_prompt += "\n" + self.i18n.slice(
"formatted_task_instructions"
).format(output_format=schema)
# Choose the output format, preferring output_json if available
output_format = (
task.output_json if task.output_json else task.output_pydantic
)
schema = generate_model_description(cast(type, output_format))
task_prompt += f"\n{self.i18n.slice('formatted_task_instructions').format(output_format=schema)}"
if context:
task_prompt = self.i18n.slice("task_with_context").format(
@@ -276,9 +268,6 @@ class Agent(BaseAgent):
raise e
result = self.execute_task(task, context, tools)
if self.max_rpm and self._rpm_controller:
self._rpm_controller.stop_rpm_counter()
# If there was any tool in self.tools_results that had result_as_answer
# set to True, return the results of the last tool that had
# result_as_answer set to True
@@ -338,7 +327,7 @@ class Agent(BaseAgent):
request_within_rpm_limit=(
self._rpm_controller.check_or_wait if self._rpm_controller else None
),
callbacks=[TokenCalcHandler(self._token_process)],
callbacks=[TokenCalcHandler(self.token_process)],
)
def get_delegation_tools(self, agents: List[BaseAgent]):

View File

@@ -73,20 +73,27 @@ class BaseAgent(ABC, BaseModel):
Increment formatting errors.
copy() -> "BaseAgent":
Create a copy of the agent.
set_rpm_controller(rpm_controller: RPMController) -> None:
set_rpm_controller(rpm_controller: Optional[RPMController] = None) -> None:
Set the rpm controller for the agent.
set_private_attrs() -> "BaseAgent":
Set private attributes.
configure_executor(cache_handler: CacheHandler, rpm_controller: RPMController) -> None:
Configure the agent's executor with both cache and RPM handling.
"""
__hash__ = object.__hash__ # type: ignore
model_config = {
"arbitrary_types_allowed": True,
}
_logger: Logger = PrivateAttr(default_factory=lambda: Logger(verbose=False))
_rpm_controller: Optional[RPMController] = PrivateAttr(default=None)
_request_within_rpm_limit: Any = PrivateAttr(default=None)
_original_role: Optional[str] = PrivateAttr(default=None)
_original_goal: Optional[str] = PrivateAttr(default=None)
_original_backstory: Optional[str] = PrivateAttr(default=None)
_token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess)
token_process: TokenProcess = Field(default_factory=TokenProcess, exclude=True)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
formatting_errors: int = Field(
default=0, description="Number of formatting errors."
@@ -196,8 +203,6 @@ class BaseAgent(ABC, BaseModel):
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
if not self._token_process:
self._token_process = TokenProcess()
return self
@@ -217,8 +222,7 @@ class BaseAgent(ABC, BaseModel):
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
if not self._token_process:
self._token_process = TokenProcess()
return self
@property
@@ -266,7 +270,7 @@ class BaseAgent(ABC, BaseModel):
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"token_process",
"agent_executor",
"tools",
"tools_handler",
@@ -337,20 +341,49 @@ class BaseAgent(ABC, BaseModel):
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
self.create_agent_executor()
# Only create the executor if it hasn't been created yet.
if self.agent_executor is None:
self.create_agent_executor()
def increment_formatting_errors(self) -> None:
self.formatting_errors += 1
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
Args:
rpm_controller: An instance of the RPMController class.
def set_rpm_controller(
self, rpm_controller: Optional[RPMController] = None
) -> None:
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()
Set the RPM controller for the agent. If no rpm_controller is provided, then:
- use self.max_rpm if set, or
- if self.crew exists and has max_rpm, use that.
"""
if self._rpm_controller is None:
if rpm_controller is not None:
self._rpm_controller = rpm_controller
elif self.max_rpm:
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
elif self.crew and getattr(self.crew, "max_rpm", None):
self._rpm_controller = RPMController(
max_rpm=self.crew.max_rpm, logger=self._logger
)
# else: no rpm limit provided leave the controller None
if self.agent_executor is None:
self.create_agent_executor()
def configure_executor(
self, cache_handler: CacheHandler, rpm_controller: Optional[RPMController]
) -> None:
"""Configure the agent's executor with both cache and RPM handling.
This method delegates to set_cache_handler and set_rpm_controller, applying the configuration
only if the respective flags or values are set.
"""
if self.cache:
self.set_cache_handler(cache_handler)
# Use the injected RPM controller rather than auto-creating one
if rpm_controller:
self.set_rpm_controller(rpm_controller)
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
pass

View File

@@ -88,7 +88,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
tool.name: tool for tool in self.tools
}
self.stop = stop_words
self.llm.stop = list(set(self.llm.stop + self.stop))
self.llm.stop = list(set((self.llm.stop or []) + self.stop))
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
if "system" in self.prompt:

View File

@@ -0,0 +1,468 @@
from typing import Any, List, Optional, Type, Union, cast
from pydantic import Field, field_validator
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.base_tool import Tool
from crewai.utilities.converter import Converter, generate_model_description
from crewai.utilities.token_counter_callback import (
LangChainTokenCounter,
LiteLLMTokenCounter,
)
class LangChainAgentAdapter(BaseAgent):
"""
Adapter class to wrap a LangChain agent and make it compatible with CrewAI's BaseAgent interface.
Note:
- This adapter does not require LangChain as a dependency.
- It wraps an external LangChain agent (passed as any type) and delegates calls
such as execute_task() to the LangChain agent's invoke() method.
- Extended logic is added to build prompts, incorporate memory, knowledge, training hints,
and now a human feedback loop similar to what is done in CrewAgentExecutor.
"""
langchain_agent: Any = Field(
...,
description="The wrapped LangChain runnable agent instance. It is expected to have an 'invoke' method.",
)
tools: Optional[List[Union[BaseTool, Any]]] = Field(
default_factory=list,
description="Tools at the agent's disposal. Accepts both CrewAI BaseTool instances and other tools.",
)
function_calling_llm: Optional[Any] = Field(
default=None, description="Optional function calling LLM."
)
step_callback: Optional[Any] = Field(
default=None,
description="Callback executed after each step of agent execution.",
)
allow_code_execution: Optional[bool] = Field(
default=False, description="Enable code execution for the agent."
)
multimodal: bool = Field(
default=False, description="Whether the agent is multimodal."
)
i18n: Any = None
crew: Any = None
knowledge: Any = None
token_process: TokenProcess = Field(default_factory=TokenProcess, exclude=True)
token_callback: Optional[Any] = None
class Config:
arbitrary_types_allowed = True
@field_validator("tools", mode="before")
def convert_tools(cls, value):
"""Ensure tools are valid CrewAI BaseTool instances."""
if not value:
return value
new_tools = []
for tool in value:
# If tool is already a CrewAI BaseTool instance, keep it as is.
if isinstance(tool, BaseTool):
new_tools.append(tool)
else:
new_tools.append(Tool.from_langchain(tool))
return new_tools
def _extract_text(self, message: Any) -> str:
"""
Helper to extract plain text from a message object.
This checks if the message is a dict with a "content" key, or has a "content" attribute,
or if it's a tuple from LangGraph's message format.
"""
# Handle LangGraph message tuple format (role, content)
if isinstance(message, tuple) and len(message) == 2:
return str(message[1])
# Handle dictionary with content key
elif isinstance(message, dict):
if "content" in message:
return message["content"]
# Handle LangGraph message format with additional metadata
elif "messages" in message and message["messages"]:
last_message = message["messages"][-1]
if isinstance(last_message, tuple) and len(last_message) == 2:
return str(last_message[1])
return self._extract_text(last_message)
# Handle object with content attribute
elif hasattr(message, "content") and isinstance(
getattr(message, "content"), str
):
return getattr(message, "content")
# Handle string directly
elif isinstance(message, str):
return message
# Default fallback
return str(message)
def _register_token_callback(self):
"""
Register the appropriate token counter callback with the language model.
This method handles different types of models (LiteLLM, LangChain, direct LLMs)
and different callback structures.
"""
# Skip if we already have a token callback registered
if self.token_callback is not None:
return
# Skip if we don't have a token_process attribute
if not hasattr(self, "token_process"):
return
# Determine if we're using LiteLLM or LangChain based on the agent type
if hasattr(self.langchain_agent, "client") and hasattr(
self.langchain_agent.client, "callbacks"
):
# This is likely a LiteLLM-based agent
self.token_callback = LiteLLMTokenCounter(self.token_process)
# Add our callback to the LLM directly
if isinstance(self.langchain_agent.client.callbacks, list):
if self.token_callback not in self.langchain_agent.client.callbacks:
self.langchain_agent.client.callbacks.append(self.token_callback)
else:
self.langchain_agent.client.callbacks = [self.token_callback]
else:
# This is likely a LangChain-based agent
self.token_callback = LangChainTokenCounter(self.token_process)
# Add callback to the LangChain model
if hasattr(self.langchain_agent, "callbacks"):
if self.langchain_agent.callbacks is None:
self.langchain_agent.callbacks = [self.token_callback]
elif isinstance(self.langchain_agent.callbacks, list):
self.langchain_agent.callbacks.append(self.token_callback)
# For direct LLM models
elif hasattr(self.langchain_agent, "llm") and hasattr(
self.langchain_agent.llm, "callbacks"
):
if self.langchain_agent.llm.callbacks is None:
self.langchain_agent.llm.callbacks = [self.token_callback]
elif isinstance(self.langchain_agent.llm.callbacks, list):
self.langchain_agent.llm.callbacks.append(self.token_callback)
# Direct LLM case
elif not hasattr(self.langchain_agent, "agent"):
# This might be a direct LLM, not an agent
if (
not hasattr(self.langchain_agent, "callbacks")
or self.langchain_agent.callbacks is None
):
self.langchain_agent.callbacks = [self.token_callback]
elif isinstance(self.langchain_agent.callbacks, list):
self.langchain_agent.callbacks.append(self.token_callback)
def execute_task(
self,
task: Task,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
) -> str:
"""
Execute a task by building the full task prompt (with memory, knowledge, tool instructions,
and training hints) then delegating execution to the wrapped LangChain agent.
If the task requires human input, a feedback loop is run that mimics the CrewAgentExecutor.
"""
task_prompt = task.prompt()
if task.output_json or task.output_pydantic:
# Choose the output format, preferring output_json if available
output_format = (
task.output_json if task.output_json else task.output_pydantic
)
schema = generate_model_description(cast(type, output_format))
instruction = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
task_prompt += f"\n{instruction}"
if context:
task_prompt = self.i18n.slice("task_with_context").format(
task=task_prompt, context=context
)
if self.crew and self.crew.memory:
from crewai.memory.contextual.contextual_memory import ContextualMemory
contextual_memory = ContextualMemory(
self.crew.memory_config,
self.crew._short_term_memory,
self.crew._long_term_memory,
self.crew._entity_memory,
self.crew._user_memory,
)
memory = contextual_memory.build_context_for_task(task, context)
if memory.strip():
task_prompt += self.i18n.slice("memory").format(memory=memory)
if self.knowledge:
agent_knowledge_snippets = self.knowledge.query([task.prompt()])
if agent_knowledge_snippets:
from crewai.knowledge.utils.knowledge_utils import (
extract_knowledge_context,
)
agent_knowledge_context = extract_knowledge_context(
agent_knowledge_snippets
)
if agent_knowledge_context:
task_prompt += agent_knowledge_context
if self.crew:
knowledge_snippets = self.crew.query_knowledge([task.prompt()])
if knowledge_snippets:
from crewai.knowledge.utils.knowledge_utils import (
extract_knowledge_context,
)
crew_knowledge_context = extract_knowledge_context(knowledge_snippets)
if crew_knowledge_context:
task_prompt += crew_knowledge_context
tools = tools or self.tools or []
self.create_agent_executor(tools=tools)
self._show_start_logs(task)
if self.crew and getattr(self.crew, "_train", False):
task_prompt = self._training_handler(task_prompt=task_prompt)
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
# Register token tracking callback
self._register_token_callback()
init_state = {"messages": [("user", task_prompt)]}
# Estimate input tokens for tracking
if hasattr(self, "token_process"):
# Rough estimate based on characters (better than word count)
estimated_prompt_tokens = len(task_prompt) // 4 # ~4 chars per token
self.token_process.sum_prompt_tokens(estimated_prompt_tokens)
state = self.agent_executor.invoke(init_state)
# Extract output from state based on its structure
if "structured_response" in state:
current_output = state["structured_response"]
elif "messages" in state and state["messages"]:
last_message = state["messages"][-1]
current_output = self._extract_text(last_message)
elif "output" in state:
current_output = str(state["output"])
else:
# Fallback to extracting text from the entire state
current_output = self._extract_text(state)
# Estimate completion tokens for tracking if we don't have actual counts
if hasattr(self, "token_process"):
# Rough estimate based on characters
estimated_completion_tokens = len(current_output) // 4 # ~4 chars per token
self.token_process.sum_completion_tokens(estimated_completion_tokens)
self.token_process.sum_successful_requests(1)
if task.human_input:
current_output = self._handle_human_feedback(current_output)
return current_output
def _handle_human_feedback(self, current_output: str) -> str:
"""
Implements a feedback loop that prompts the user for feedback and then instructs
the underlying LangChain agent to regenerate its answer with the requested changes.
Only the inner content of the output is displayed to the user.
"""
while True:
print("\nAgent output:")
# Print only the inner text extracted from current_output.
print(self._extract_text(current_output))
feedback = input("\nEnter your feedback (or press Enter to accept): ")
if not feedback.strip():
break # No feedback provided, exit the loop
extracted_output = self._extract_text(current_output)
new_prompt = (
f"Below is your previous answer:\n"
f"{extracted_output}\n\n"
f"Based on the following feedback: '{feedback}', please regenerate your answer with the requested details. "
f"Specifically, display 10 bullet points in each section. Provide the complete updated answer below.\n\n"
f"Updated answer:"
)
# Estimate input tokens for tracking
if hasattr(self, "token_process"):
# Rough estimate based on characters
estimated_prompt_tokens = len(new_prompt) // 4 # ~4 chars per token
self.token_process.sum_prompt_tokens(estimated_prompt_tokens)
try:
new_state = self.agent_executor.invoke(
{"messages": [("user", new_prompt)]}
)
# Extract output from state based on its structure
if "structured_response" in new_state:
new_output = new_state["structured_response"]
elif "messages" in new_state and new_state["messages"]:
last_message = new_state["messages"][-1]
new_output = self._extract_text(last_message)
elif "output" in new_state:
new_output = str(new_state["output"])
else:
# Fallback to extracting text from the entire state
new_output = self._extract_text(new_state)
# Estimate completion tokens for tracking
if hasattr(self, "token_process"):
# Rough estimate based on characters
estimated_completion_tokens = (
len(new_output) // 4
) # ~4 chars per token
self.token_process.sum_completion_tokens(
estimated_completion_tokens
)
self.token_process.sum_successful_requests(1)
current_output = new_output
except Exception as e:
print("Error during re-invocation with feedback:", e)
break
return current_output
def _generate_model_description(self, model: Any) -> str:
"""
Generates a string description (schema) for the expected output.
This is a placeholder that should call the actual implementation.
"""
from crewai.utilities.converter import generate_model_description
return generate_model_description(model)
def _training_handler(self, task_prompt: str) -> str:
"""
Append training instructions from Crew data to the task prompt.
"""
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.training_handler import CrewTrainingHandler
data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
if data:
agent_id = str(self.id)
if data.get(agent_id):
human_feedbacks = [
i["human_feedback"] for i in data.get(agent_id, {}).values()
]
task_prompt += (
"\n\nYou MUST follow these instructions: \n "
+ "\n - ".join(human_feedbacks)
)
return task_prompt
def _use_trained_data(self, task_prompt: str) -> str:
"""
Append pre-trained instructions from Crew data to the task prompt.
"""
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE
from crewai.utilities.training_handler import CrewTrainingHandler
data = CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).load()
if data and (trained_data_output := data.get(getattr(self, "role", "default"))):
task_prompt += (
"\n\nYou MUST follow these instructions: \n - "
+ "\n - ".join(trained_data_output["suggestions"])
)
return task_prompt
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
"""
Creates an agent executor using LangGraph's create_react_agent if given an LLM,
or uses the provided language model directly.
"""
try:
from langgraph.prebuilt import create_react_agent
except ImportError as e:
raise ImportError(
"LangGraph library not found. Please run `uv add langgraph` to add LangGraph support."
) from e
# Ensure raw_tools is always a list.
raw_tools: List[Any] = (
tools
if tools is not None
else (self.tools if self.tools is not None else [])
)
# Fallback: if raw_tools is still empty, try to extract them from the wrapped LangChain agent.
if not raw_tools:
if hasattr(self.langchain_agent, "agent") and hasattr(
self.langchain_agent.agent, "tools"
):
raw_tools = self.langchain_agent.agent.tools or []
else:
raw_tools = getattr(self.langchain_agent, "tools", []) or []
used_tools = []
# Use the global CrewAI Tool class (imported at the module level)
for tool in raw_tools:
# If the tool is a CrewAI Tool, convert it to a LangChain compatible tool.
if isinstance(tool, Tool):
used_tools.append(tool.to_langchain())
else:
used_tools.append(tool)
# Sanitize the agent's role for the "name" field. The allowed pattern is ^[a-zA-Z0-9_-]+$
import re
agent_role = getattr(self, "role", "agent")
sanitized_role = re.sub(r"\s+", "_", agent_role)
# Register token tracking callback
self._register_token_callback()
self.agent_executor = create_react_agent(
model=self.langchain_agent,
tools=used_tools,
debug=getattr(self, "verbose", False),
name=sanitized_role,
)
def _parse_tools(self, tools: List[BaseTool]) -> List[BaseTool]:
return tools
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[BaseTool]:
return []
def get_output_converter(
self,
llm: Any,
text: str,
model: Optional[Type] = None,
instructions: str = "",
) -> Converter:
return Converter(llm=llm, text=text, model=model, instructions=instructions)
def _show_start_logs(self, task: Task) -> None:
if self.langchain_agent is None:
raise ValueError("Agent cannot be None")
# Check if the adapter or its crew is in verbose mode.
verbose = self.verbose or (self.crew and getattr(self.crew, "verbose", False))
if verbose:
from crewai.utilities import Printer
printer = Printer()
# Use the adapter's role (inherited from BaseAgent) for logging.
printer.print(
content=f"\033[1m\033[95m# Agent:\033[00m \033[1m\033[92m{self.role}\033[00m"
)
description = getattr(task, "description", "Not Found")
printer.print(
content=f"\033[95m## Task:\033[00m \033[92m{description}\033[00m"
)

View File

@@ -94,7 +94,7 @@ class Crew(BaseModel):
__hash__ = object.__hash__ # type: ignore
_execution_span: Any = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr()
_rpm_controller: Optional[RPMController] = PrivateAttr()
_logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
@@ -248,7 +248,6 @@ class Crew(BaseModel):
@model_validator(mode="after")
def set_private_attrs(self) -> "Crew":
"""Set private attributes."""
self._cache_handler = CacheHandler()
self._logger = Logger(verbose=self.verbose)
if self.output_log_file:
self._file_handler = FileHandler(self.output_log_file)
@@ -258,6 +257,24 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def initialize_dependencies(self) -> "Crew":
# Always create a cache handler, but it will only be used if self.cache is True
# Create the Crew-level RPM controller if a max RPM is specified
if self.max_rpm is not None:
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=Logger(verbose=self.verbose)
)
else:
self._rpm_controller = None
# Now inject these external dependencies into each agent
for agent in self.agents:
agent.crew = self # ensure the agent's crew reference is set
agent.configure_executor(self._cache_handler, self._rpm_controller)
return self
@model_validator(mode="after")
def create_crew_memory(self) -> "Crew":
"""Set private attributes."""
@@ -357,10 +374,7 @@ class Crew(BaseModel):
if self.agents:
for agent in self.agents:
if self.cache:
agent.set_cache_handler(self._cache_handler)
if self.max_rpm:
agent.set_rpm_controller(self._rpm_controller)
agent.configure_executor(self._cache_handler, self._rpm_controller)
return self
@model_validator(mode="after")
@@ -627,7 +641,7 @@ class Crew(BaseModel):
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
metrics += [agent._token_process.get_summary() for agent in self.agents]
metrics += [agent.token_process.get_summary() for agent in self.agents]
self.usage_metrics = UsageMetrics()
for metric in metrics:
@@ -1174,19 +1188,22 @@ class Crew(BaseModel):
agent.interpolate_inputs(inputs)
def _finish_execution(self, final_string_output: str) -> None:
if self.max_rpm:
if self._rpm_controller:
self._rpm_controller.stop_rpm_counter()
def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""
total_usage_metrics = UsageMetrics()
for agent in self.agents:
if hasattr(agent, "_token_process"):
token_sum = agent._token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
token_sum = self.manager_agent._token_process.get_summary()
# Directly access token_process since it's now a field in BaseAgent
token_sum = agent.token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
if self.manager_agent:
# Directly access token_process since it's now a field in BaseAgent
token_sum = self.manager_agent.token_process.get_summary()
total_usage_metrics.add_usage_metrics(token_sum)
self.usage_metrics = total_usage_metrics
return total_usage_metrics

View File

@@ -1,7 +1,7 @@
import warnings
from abc import ABC, abstractmethod
from inspect import signature
from typing import Any, Callable, Type, get_args, get_origin
from typing import Any, Callable, Optional, Type, get_args, get_origin
from pydantic import (
BaseModel,
@@ -19,11 +19,21 @@ from crewai.tools.structured_tool import CrewStructuredTool
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20)
# Define a helper function with an explicit signature
def default_cache_function(
_args: Optional[Any] = None, _result: Optional[Any] = None
) -> bool:
return True
class BaseTool(BaseModel, ABC):
class _ArgsSchemaPlaceholder(PydanticBaseModel):
pass
model_config = ConfigDict()
model_config = ConfigDict(
arbitrary_types_allowed=True,
from_attributes=True, # Allow conversion from ORM objects
)
name: str
"""The unique name of the tool that clearly communicates its purpose."""
@@ -33,8 +43,10 @@ class BaseTool(BaseModel, ABC):
"""The schema for the arguments that the tool accepts."""
description_updated: bool = False
"""Flag to check if the description has been updated."""
cache_function: Callable = lambda _args=None, _result=None: True
"""Function that will be used to determine if the tool should be cached, should return a boolean. If None, the tool will be cached."""
cache_function: Callable[[Optional[Any], Optional[Any]], bool] = (
default_cache_function
)
"""Function used to determine if the tool should be cached."""
result_as_answer: bool = False
"""Flag to check if the tool should be the final agent answer."""
@@ -177,74 +189,43 @@ class BaseTool(BaseModel, ABC):
return origin.__name__
@property
def get(self) -> Callable[[str, Any], Any]:
# Instead of an inline lambda, we define a helper function with explicit types.
def _getter(key: str, default: Any = None) -> Any:
return getattr(self, key, default)
return _getter
class Tool(BaseTool):
"""The function that will be executed when the tool is called."""
"""Tool implementation that requires a function."""
func: Callable
model_config = ConfigDict(
arbitrary_types_allowed=True,
from_attributes=True,
)
def _run(self, *args: Any, **kwargs: Any) -> Any:
return self.func(*args, **kwargs)
@classmethod
def from_langchain(cls, tool: Any) -> "Tool":
"""Create a Tool instance from a CrewStructuredTool.
def to_langchain(self) -> Any:
"""Convert to a LangChain-compatible tool."""
try:
from langchain_core.tools import Tool as LC_Tool
except ImportError:
raise ImportError("langchain_core is not installed")
This method takes a CrewStructuredTool object and converts it into a
Tool instance. It ensures that the provided tool has a callable 'func'
attribute and infers the argument schema if not explicitly provided.
Args:
tool (Any): The CrewStructuredTool object to be converted.
Returns:
Tool: A new Tool instance created from the provided CrewStructuredTool.
Raises:
ValueError: If the provided tool does not have a callable 'func' attribute.
"""
if not hasattr(tool, "func") or not callable(tool.func):
raise ValueError("The provided tool must have a callable 'func' attribute.")
args_schema = getattr(tool, "args_schema", None)
if args_schema is None:
# Infer args_schema from the function signature if not provided
func_signature = signature(tool.func)
annotations = func_signature.parameters
args_fields = {}
for name, param in annotations.items():
if name != "self":
param_annotation = (
param.annotation if param.annotation != param.empty else Any
)
field_info = Field(
default=...,
description="",
)
args_fields[name] = (param_annotation, field_info)
if args_fields:
args_schema = create_model(f"{tool.name}Input", **args_fields)
else:
# Create a default schema with no fields if no parameters are found
args_schema = create_model(
f"{tool.name}Input", __base__=PydanticBaseModel
)
return cls(
name=getattr(tool, "name", "Unnamed Tool"),
description=getattr(tool, "description", ""),
func=tool.func,
args_schema=args_schema,
# Use self._run (which is bound and calls self.func) so that the LC_Tool gets proper attributes.
return LC_Tool(
name=self.name,
description=self.description,
func=self._run,
args_schema=self.args_schema,
)
def to_langchain(
tools: list[BaseTool | CrewStructuredTool],
) -> list[CrewStructuredTool]:
return [t.to_structured_tool() if isinstance(t, BaseTool) else t for t in tools]
def tool(*args):
"""
Decorator to create a tool from a function.

View File

@@ -1,15 +1,52 @@
import warnings
from typing import Any, Dict, Optional
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from langchain_core.callbacks.base import BaseCallbackHandler
from litellm.integrations.custom_logger import CustomLogger
from litellm.types.utils import Usage
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
class TokenCalcHandler(CustomLogger):
def __init__(self, token_cost_process: Optional[TokenProcess]):
self.token_cost_process = token_cost_process
class AbstractTokenCounter(ABC):
"""
Abstract base class for token counting callbacks.
Implementations should track token usage from different LLM providers.
"""
def __init__(self, token_process: Optional[TokenProcess] = None):
"""Initialize with a TokenProcess instance to track tokens."""
self.token_process = token_process
@abstractmethod
def update_token_usage(self, prompt_tokens: int, completion_tokens: int) -> None:
"""Update token usage counts in the token process."""
pass
class LiteLLMTokenCounter(CustomLogger, AbstractTokenCounter):
"""
Token counter implementation for LiteLLM.
Uses LiteLLM's CustomLogger interface to track token usage.
"""
def __init__(self, token_process: Optional[TokenProcess] = None):
AbstractTokenCounter.__init__(self, token_process)
CustomLogger.__init__(self)
def update_token_usage(self, prompt_tokens: int, completion_tokens: int) -> None:
"""Update token usage counts in the token process."""
if self.token_process is None:
return
if prompt_tokens > 0:
self.token_process.sum_prompt_tokens(prompt_tokens)
if completion_tokens > 0:
self.token_process.sum_completion_tokens(completion_tokens)
self.token_process.sum_successful_requests(1)
def log_success_event(
self,
@@ -18,7 +55,11 @@ class TokenCalcHandler(CustomLogger):
start_time: float,
end_time: float,
) -> None:
if self.token_cost_process is None:
"""
Process successful LLM call and extract token usage information.
This method is called by LiteLLM after a successful completion.
"""
if self.token_process is None:
return
with warnings.catch_warnings():
@@ -26,18 +67,159 @@ class TokenCalcHandler(CustomLogger):
if isinstance(response_obj, dict) and "usage" in response_obj:
usage: Usage = response_obj["usage"]
if usage:
self.token_cost_process.sum_successful_requests(1)
prompt_tokens = 0
completion_tokens = 0
if hasattr(usage, "prompt_tokens"):
self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens)
prompt_tokens = usage.prompt_tokens
elif isinstance(usage, dict) and "prompt_tokens" in usage:
prompt_tokens = usage["prompt_tokens"]
if hasattr(usage, "completion_tokens"):
self.token_cost_process.sum_completion_tokens(
usage.completion_tokens
)
completion_tokens = usage.completion_tokens
elif isinstance(usage, dict) and "completion_tokens" in usage:
completion_tokens = usage["completion_tokens"]
self.update_token_usage(prompt_tokens, completion_tokens)
# Handle cached tokens if available
if (
hasattr(usage, "prompt_tokens_details")
and usage.prompt_tokens_details
and usage.prompt_tokens_details.cached_tokens
):
self.token_cost_process.sum_cached_prompt_tokens(
self.token_process.sum_cached_prompt_tokens(
usage.prompt_tokens_details.cached_tokens
)
class LangChainTokenCounter(BaseCallbackHandler, AbstractTokenCounter):
"""
Token counter implementation for LangChain.
Implements the necessary callback methods to track token usage from LangChain responses.
"""
def __init__(self, token_process: Optional[TokenProcess] = None):
BaseCallbackHandler.__init__(self)
AbstractTokenCounter.__init__(self, token_process)
def update_token_usage(self, prompt_tokens: int, completion_tokens: int) -> None:
"""Update token usage counts in the token process."""
if self.token_process is None:
return
if prompt_tokens > 0:
self.token_process.sum_prompt_tokens(prompt_tokens)
if completion_tokens > 0:
self.token_process.sum_completion_tokens(completion_tokens)
self.token_process.sum_successful_requests(1)
@property
def ignore_llm(self) -> bool:
return False
@property
def ignore_chain(self) -> bool:
return True
@property
def ignore_agent(self) -> bool:
return False
@property
def ignore_chat_model(self) -> bool:
return False
@property
def ignore_retriever(self) -> bool:
return True
@property
def ignore_tools(self) -> bool:
return True
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Called when LLM starts processing."""
pass
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Called when LLM generates a new token."""
pass
def on_llm_end(self, response: Any, **kwargs: Any) -> None:
"""
Called when LLM ends processing.
Extracts token usage from LangChain response objects.
"""
if self.token_process is None:
return
# Handle LangChain response format
if hasattr(response, "llm_output") and isinstance(response.llm_output, dict):
token_usage = response.llm_output.get("token_usage", {})
prompt_tokens = token_usage.get("prompt_tokens", 0)
completion_tokens = token_usage.get("completion_tokens", 0)
self.update_token_usage(prompt_tokens, completion_tokens)
def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
"""Called when LLM errors."""
pass
def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> None:
"""Called when a chain starts."""
pass
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
"""Called when a chain ends."""
pass
def on_chain_error(self, error: BaseException, **kwargs: Any) -> None:
"""Called when a chain errors."""
pass
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> None:
"""Called when a tool starts."""
pass
def on_tool_end(self, output: str, **kwargs: Any) -> None:
"""Called when a tool ends."""
pass
def on_tool_error(self, error: BaseException, **kwargs: Any) -> None:
"""Called when a tool errors."""
pass
def on_text(self, text: str, **kwargs: Any) -> None:
"""Called when text is generated."""
pass
def on_agent_start(self, serialized: Dict[str, Any], **kwargs: Any) -> None:
"""Called when an agent starts."""
pass
def on_agent_end(self, output: Any, **kwargs: Any) -> None:
"""Called when an agent ends."""
pass
def on_agent_error(self, error: BaseException, **kwargs: Any) -> None:
"""Called when an agent errors."""
pass
# For backward compatibility
class TokenCalcHandler(LiteLLMTokenCounter):
"""
Alias for LiteLLMTokenCounter.
"""
pass

View File

@@ -547,6 +547,7 @@ def test_crew_with_delegating_agents():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_with_delegating_agents_should_not_override_task_tools():
from typing import Type
from pydantic import BaseModel, Field

View File

@@ -18,15 +18,15 @@ def test_llm_callback_replacement():
llm1 = LLM(model="gpt-4o-mini")
llm2 = LLM(model="gpt-4o-mini")
calc_handler_1 = TokenCalcHandler(token_cost_process=TokenProcess())
calc_handler_2 = TokenCalcHandler(token_cost_process=TokenProcess())
calc_handler_1 = TokenCalcHandler(token_process=TokenProcess())
calc_handler_2 = TokenCalcHandler(token_process=TokenProcess())
result1 = llm1.call(
messages=[{"role": "user", "content": "Hello, world!"}],
callbacks=[calc_handler_1],
)
print("result1:", result1)
usage_metrics_1 = calc_handler_1.token_cost_process.get_summary()
usage_metrics_1 = calc_handler_1.token_process.get_summary()
print("usage_metrics_1:", usage_metrics_1)
result2 = llm2.call(
@@ -35,13 +35,13 @@ def test_llm_callback_replacement():
)
sleep(5)
print("result2:", result2)
usage_metrics_2 = calc_handler_2.token_cost_process.get_summary()
usage_metrics_2 = calc_handler_2.token_process.get_summary()
print("usage_metrics_2:", usage_metrics_2)
# The first handler should not have been updated
assert usage_metrics_1.successful_requests == 1
assert usage_metrics_2.successful_requests == 1
assert usage_metrics_1 == calc_handler_1.token_cost_process.get_summary()
assert usage_metrics_1 == calc_handler_1.token_process.get_summary()
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -57,14 +57,14 @@ def test_llm_call_with_string_input():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_call_with_string_input_and_callbacks():
llm = LLM(model="gpt-4o-mini")
calc_handler = TokenCalcHandler(token_cost_process=TokenProcess())
calc_handler = TokenCalcHandler(token_process=TokenProcess())
# Test the call method with a string input and callbacks
result = llm.call(
"Tell me a joke.",
callbacks=[calc_handler],
)
usage_metrics = calc_handler.token_cost_process.get_summary()
usage_metrics = calc_handler.token_process.get_summary()
assert isinstance(result, str)
assert len(result.strip()) > 0
@@ -285,6 +285,7 @@ def test_o3_mini_reasoning_effort_medium():
assert isinstance(result, str)
assert "Paris" in result
def test_context_window_validation():
"""Test that context window validation works correctly."""
# Test valid window size

View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python
"""
Test module for token tracking functionality in CrewAI.
This tests both direct LangChain models and LiteLLM integration.
"""
import os
from typing import Any, Dict
from unittest.mock import MagicMock, patch
import pytest
from langchain_core.tools import Tool
from langchain_openai import ChatOpenAI
from crewai import Crew, Process, Task
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.agents.langchain_agent_adapter import LangChainAgentAdapter
from crewai.utilities.token_counter_callback import (
LangChainTokenCounter,
LiteLLMTokenCounter,
)
def get_weather(location: str = "San Francisco"):
"""Simulates fetching current weather data for a given location."""
# In a real implementation, you could replace this with an API call.
return f"Current weather in {location}: Sunny, 25°C"
class TestTokenTracking:
"""Test suite for token tracking functionality."""
@pytest.fixture
def weather_tool(self):
"""Create a simple weather tool for testing."""
return Tool(
name="Weather",
func=get_weather,
description="Useful for fetching current weather information for a given location.",
)
@pytest.fixture
def mock_openai_response(self):
"""Create a mock OpenAI response with token usage information."""
return {
"usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
"total_tokens": 150,
}
}
def test_token_process_basic(self):
"""Test basic functionality of TokenProcess class."""
token_process = TokenProcess()
# Test adding prompt tokens
token_process.sum_prompt_tokens(100)
assert token_process.prompt_tokens == 100
# Test adding completion tokens
token_process.sum_completion_tokens(50)
assert token_process.completion_tokens == 50
# Test adding successful requests
token_process.sum_successful_requests(1)
assert token_process.successful_requests == 1
# Test getting summary
summary = token_process.get_summary()
assert summary.prompt_tokens == 100
assert summary.completion_tokens == 50
assert summary.total_tokens == 150
assert summary.successful_requests == 1
@patch("litellm.completion")
def test_litellm_token_counter(self, mock_completion):
"""Test LiteLLMTokenCounter with a mock response."""
# Setup
token_process = TokenProcess()
counter = LiteLLMTokenCounter(token_process)
# Mock the response
mock_completion.return_value = {
"usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
}
}
# Simulate a successful LLM call
counter.log_success_event(
kwargs={},
response_obj=mock_completion.return_value,
start_time=0,
end_time=1,
)
# Verify token counts were updated
assert token_process.prompt_tokens == 100
assert token_process.completion_tokens == 50
assert token_process.successful_requests == 1
def test_langchain_token_counter(self):
"""Test LangChainTokenCounter with a mock response."""
# Setup
token_process = TokenProcess()
counter = LangChainTokenCounter(token_process)
# Create a mock LangChain response
mock_response = MagicMock()
mock_response.llm_output = {
"token_usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
}
}
# Simulate a successful LLM call
counter.on_llm_end(mock_response)
# Verify token counts were updated
assert token_process.prompt_tokens == 100
assert token_process.completion_tokens == 50
assert token_process.successful_requests == 1
@pytest.mark.skipif(
not os.environ.get("OPENAI_API_KEY"),
reason="OPENAI_API_KEY environment variable not set",
)
def test_langchain_agent_adapter_token_tracking(self, weather_tool):
"""
Integration test for token tracking with LangChainAgentAdapter.
This test requires an OpenAI API key.
"""
# Skip if LangGraph is not installed
try:
from langgraph.prebuilt import ToolNode
except ImportError:
pytest.skip("LangGraph is not installed. Install it with: uv add langgraph")
# Initialize a ChatOpenAI model
llm = ChatOpenAI(model="gpt-4o")
# Create a LangChainAgentAdapter with the direct LLM
agent = LangChainAgentAdapter(
langchain_agent=llm,
tools=[weather_tool],
role="Weather Agent",
goal="Provide current weather information for the requested location.",
backstory="An expert weather provider that fetches current weather information using simulated data.",
verbose=True,
)
# Create a weather task for the agent
task = Task(
description="Fetch the current weather for San Francisco.",
expected_output="A weather report showing current conditions in San Francisco.",
agent=agent,
)
# Create a crew with the single agent and task
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
process=Process.sequential,
)
# Execute the crew
result = crew.kickoff()
# Verify token usage was tracked
assert result.token_usage is not None
assert result.token_usage.total_tokens > 0
assert result.token_usage.prompt_tokens > 0
assert result.token_usage.completion_tokens > 0
assert result.token_usage.successful_requests > 0
# Also verify token usage directly from the agent
usage = agent.token_process.get_summary()
assert usage.prompt_tokens > 0
assert usage.completion_tokens > 0
assert usage.total_tokens > 0
assert usage.successful_requests > 0
if __name__ == "__main__":
pytest.main(["-xvs", __file__])