Lorenze/imp/native tool calling (#4258)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

* wip restrcuturing agent executor and liteagent

* fix: handle None task in AgentExecutor to prevent errors

Added a check to ensure that if the task is None, the method returns early without attempting to access task properties. This change improves the robustness of the AgentExecutor by preventing potential errors when the task is not set.

* refactor: streamline AgentExecutor initialization by removing redundant parameters

Updated the Agent class to simplify the initialization of the AgentExecutor by removing unnecessary task and crew parameters in standalone mode. This change enhances code clarity and maintains backward compatibility by ensuring that the executor is correctly configured without redundant assignments.

* wip: clean

* ensure executors work inside a flow due to flow in flow async structure

* refactor: enhance agent kickoff preparation by separating common logic

Updated the Agent class to introduce a new private method  that consolidates the common setup logic for both synchronous and asynchronous kickoff executions. This change improves code clarity and maintainability by reducing redundancy in the kickoff process, while ensuring that the agent can still execute effectively within both standalone and flow contexts.

* linting and tests

* fix test

* refactor: improve test for Agent kickoff parameters

Updated the test for the Agent class to ensure that the kickoff method correctly preserves parameters. The test now verifies the configuration of the agent after kickoff, enhancing clarity and maintainability. Additionally, the test for asynchronous kickoff within a flow context has been updated to reflect the Agent class instead of LiteAgent.

* refactor: update test task guardrail process output for improved validation

Refactored the test for task guardrail process output to enhance the validation of the output against the OpenAPI schema. The changes include a more structured request body and updated response handling to ensure compliance with the guardrail requirements. This update aims to improve the clarity and reliability of the test cases, ensuring that task outputs are correctly validated and feedback is appropriately provided.

* test fix cassette

* test fix cassette

* working

* working cassette

* refactor: streamline agent execution and enhance flow compatibility

Refactored the Agent class to simplify the execution method by removing the event loop check and clarifying the behavior when called from synchronous and asynchronous contexts. The changes ensure that the method operates seamlessly within flow methods, improving clarity in the documentation. Additionally, updated the AgentExecutor to set the response model to None, enhancing flexibility. New test cassettes were added to validate the functionality of agents within flow contexts, ensuring robust testing for both synchronous and asynchronous operations.

* fixed cassette

* Enhance Flow Execution Logic

- Introduced conditional execution for start methods in the Flow class.
- Unconditional start methods are prioritized during kickoff, while conditional starts are executed only if no unconditional starts are present.
- Improved handling of cyclic flows by allowing re-execution of conditional start methods triggered by routers.
- Added checks to continue execution chains for completed conditional starts.

These changes improve the flexibility and control of flow execution, ensuring that the correct methods are triggered based on the defined conditions.

* Enhance Agent and Flow Execution Logic

- Updated the Agent class to automatically detect the event loop and return a coroutine when called within a Flow, simplifying async handling for users.
- Modified Flow class to execute listeners sequentially, preventing race conditions on shared state during listener execution.
- Improved handling of coroutine results from synchronous methods, ensuring proper execution flow and state management.

These changes enhance the overall execution logic and user experience when working with agents and flows in CrewAI.

* Enhance Flow Listener Logic and Agent Imports

- Updated the Flow class to track fired OR listeners, ensuring that multi-source OR listeners only trigger once during execution. This prevents redundant executions and improves flow efficiency.
- Cleared fired OR listeners during cyclic flow resets to allow re-execution in new cycles.
- Modified the Agent class imports to include Coroutine from collections.abc, enhancing type handling for asynchronous operations.

These changes improve the control and performance of flow execution in CrewAI, ensuring more predictable behavior in complex scenarios.

* adjusted test due to new cassette

* ensure native tool calling works with liteagent

* ensure response model is respected

* Enhance Tool Name Handling for LLM Compatibility

- Added a new function  to replace invalid characters in function names with underscores, ensuring compatibility with LLM providers.
- Updated the  function to sanitize tool names before validation.
- Modified the  function to use sanitized names for tool registration.

These changes improve the robustness of tool name handling, preventing potential issues with invalid characters in function names.

* ensure we dont finalize batch on just a liteagent finishing

* max tools per turn wip and ensure we drop print times

* fix sync main issues

* fix llm_call_completed event serialization issue

* drop max_tools_iterations

* for fixing model dump with state

* Add extract_tool_call_info function to handle various tool call formats

- Introduced a new utility function  to extract tool call ID, name, and arguments from different provider formats (OpenAI, Gemini, Anthropic, and dictionary).
- This enhancement improves the flexibility and compatibility of tool calls across multiple LLM providers, ensuring consistent handling of tool call information.
- The function returns a tuple containing the call ID, function name, and function arguments, or None if the format is unrecognized.

* Refactor AgentExecutor to support batch execution of native tool calls

- Updated the  method to process all tools from  in a single batch, enhancing efficiency and reducing the number of interactions with the LLM.
- Introduced a new utility function  to streamline the extraction of tool call details, improving compatibility with various tool formats.
- Removed the  parameter, simplifying the initialization of the .
- Enhanced logging and message handling to provide clearer insights during tool execution.
- This refactor improves the overall performance and usability of the agent execution flow.

* Update English translations for tool usage and reasoning instructions

- Revised the `post_tool_reasoning` message to clarify the analysis process after tool usage, emphasizing the need to provide only the final answer if requirements are met.
- Updated the `format` message to simplify the instructions for deciding between using a tool or providing a final answer, enhancing clarity for users.
- These changes improve the overall user experience by providing clearer guidance on task execution and response formatting.

* fix

* fixing azure tests

* organizae imports

* dropped unused

* Remove debug print statements from AgentExecutor to clean up the code and improve readability. This change enhances the overall performance of the agent execution flow by eliminating unnecessary console output during LLM calls and iterations.

* linted

* updated cassette

* regen cassette

* revert crew agent executor

* adjust cassettes and dropped tests due to native tool implementation

* adjust

* ensure we properly fail tools and emit their events

* Enhance tool handling and delegation tracking in agent executors

- Implemented immediate return for tools with result_as_answer=True in crew_agent_executor.py.
- Added delegation tracking functionality in agent_utils.py to increment delegations when specific tools are used.
- Updated tool usage logic to handle caching more effectively in tool_usage.py.
- Enhanced test cases to validate new delegation features and tool caching behavior.

This update improves the efficiency of tool execution and enhances the delegation capabilities of agents.

* Enhance tool handling and delegation tracking in agent executors

- Implemented immediate return for tools with result_as_answer=True in crew_agent_executor.py.
- Added delegation tracking functionality in agent_utils.py to increment delegations when specific tools are used.
- Updated tool usage logic to handle caching more effectively in tool_usage.py.
- Enhanced test cases to validate new delegation features and tool caching behavior.

This update improves the efficiency of tool execution and enhances the delegation capabilities of agents.

* fix cassettes

* fix

* regen cassettes

* regen gemini

* ensure we support bedrock

* supporting bedrock

* regen azure cassettes

* Implement max usage count tracking for tools in agent executors

- Added functionality to check if a tool has reached its maximum usage count before execution in both crew_agent_executor.py and agent_executor.py.
- Enhanced error handling to return a message when a tool's usage limit is reached.
- Updated tool usage logic in tool_usage.py to increment usage counts and print current usage status.
- Introduced tests to validate max usage count behavior for native tool calling, ensuring proper enforcement and tracking.

This update improves tool management by preventing overuse and providing clear feedback when limits are reached.

* fix other test

* fix test

* drop logs

* better tests

* regen

* regen all azure cassettes

* regen again placeholder for cassette matching

* fix: unify tool name sanitization across codebase

* fix: include tool role messages in save_last_messages

* fix: update sanitize_tool_name test expectations

Align test expectations with unified sanitize_tool_name behavior
that lowercases and splits camelCase for LLM provider compatibility.

* fix: apply sanitize_tool_name consistently across codebase

Unify tool name sanitization to ensure consistency between tool names
shown to LLMs and tool name matching/lookup logic.

* regen

* fix: sanitize tool names in native tool call processing

- Update extract_tool_call_info to return sanitized tool names
- Fix delegation tool name matching to use sanitized names
- Add sanitization in crew_agent_executor tool call extraction
- Add sanitization in experimental agent_executor
- Add sanitization in LLM.call function lookup
- Update streaming utility to use sanitized names
- Update base_agent_executor_mixin delegation check

* Extract text content from parts directly to avoid warning about non-text parts

* Add test case for Gemini token usage tracking

- Introduced a new YAML cassette for tracking token usage in Gemini API responses.
- Updated the test for Gemini to validate token usage metrics and response content.
- Ensured proper integration with the Gemini model and API key handling.

---------

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
This commit is contained in:
Lorenze Jay
2026-01-22 17:44:03 -08:00
committed by GitHub
parent 06d953bf46
commit bd4d039f63
112 changed files with 18227 additions and 26180 deletions

View File

@@ -89,6 +89,7 @@ from crewai.utilities.guardrail_types import GuardrailType
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -314,6 +315,22 @@ class Agent(BaseAgent):
return any(getattr(self.crew, attr) for attr in memory_attributes)
def _supports_native_tool_calling(self, tools: list[BaseTool]) -> bool:
"""Check if the LLM supports native function calling with the given tools.
Args:
tools: List of tools to check against.
Returns:
True if native function calling is supported and tools are available.
"""
return (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and len(tools) > 0
)
def execute_task(
self,
task: Task,
@@ -762,9 +779,12 @@ class Agent(BaseAgent):
raw_tools: list[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
use_native_tool_calling = self._supports_native_tool_calling(raw_tools)
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
use_native_tool_calling=use_native_tool_calling,
i18n=self.i18n,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
@@ -1320,10 +1340,10 @@ class Agent(BaseAgent):
args_schema = None
if hasattr(tool, "inputSchema") and tool.inputSchema:
args_schema = self._json_schema_to_pydantic(
tool.name, tool.inputSchema
sanitize_tool_name(tool.name), tool.inputSchema
)
schemas[tool.name] = {
schemas[sanitize_tool_name(tool.name)] = {
"description": getattr(tool, "description", ""),
"args_schema": args_schema,
}
@@ -1479,7 +1499,7 @@ class Agent(BaseAgent):
"""
return "\n".join(
[
f"Tool name: {tool.name}\nTool description:\n{tool.description}"
f"Tool name: {sanitize_tool_name(tool.name)}\nTool description:\n{tool.description}"
for tool in tools
]
)
@@ -1663,9 +1683,11 @@ class Agent(BaseAgent):
}
# Build prompt for standalone execution
use_native_tool_calling = self._supports_native_tool_calling(raw_tools)
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
use_native_tool_calling=use_native_tool_calling,
i18n=self.i18n,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
@@ -1773,7 +1795,6 @@ class Agent(BaseAgent):
)
output = self._execute_and_build_output(executor, inputs, response_format)
if self.guardrail is not None:
output = self._process_kickoff_guardrail(
output=output,

View File

@@ -17,6 +17,7 @@ from crewai.events.types.knowledge_events import (
)
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
@@ -236,14 +237,40 @@ def process_tool_results(agent: Agent, result: Any) -> Any:
def save_last_messages(agent: Agent) -> None:
"""Save the last messages from agent executor.
Sanitizes messages to be compatible with TaskOutput's LLMMessage type,
which accepts 'user', 'assistant', 'system', and 'tool' roles.
Preserves tool_call_id/name for tool messages and tool_calls for assistant messages.
Args:
agent: The agent instance.
"""
agent._last_messages = (
agent.agent_executor.messages.copy()
if agent.agent_executor and hasattr(agent.agent_executor, "messages")
else []
)
if not agent.agent_executor or not hasattr(agent.agent_executor, "messages"):
agent._last_messages = []
return
sanitized_messages: list[LLMMessage] = []
for msg in agent.agent_executor.messages:
role = msg.get("role", "")
if role not in ("user", "assistant", "system", "tool"):
continue
content = msg.get("content")
if content is None:
content = ""
sanitized_msg: LLMMessage = {"role": role, "content": content}
if role == "tool":
tool_call_id = msg.get("tool_call_id")
if tool_call_id:
sanitized_msg["tool_call_id"] = tool_call_id
name = msg.get("name")
if name:
sanitized_msg["name"] = name
elif role == "assistant":
tool_calls = msg.get("tool_calls")
if tool_calls:
sanitized_msg["tool_calls"] = tool_calls
sanitized_messages.append(sanitized_msg)
agent._last_messages = sanitized_messages
def prepare_tools(

View File

@@ -3,6 +3,8 @@ from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any
from crewai.utilities.string_utils import sanitize_tool_name as _sanitize_tool_name
if TYPE_CHECKING:
from crewai.tools.base_tool import BaseTool
@@ -35,4 +37,4 @@ class BaseToolAdapter(ABC):
@staticmethod
def sanitize_tool_name(tool_name: str) -> str:
"""Sanitize tool name for API compatibility."""
return tool_name.replace(" ", "_")
return _sanitize_tool_name(tool_name)

View File

@@ -7,7 +7,6 @@ to OpenAI Assistant-compatible format using the agents library.
from collections.abc import Awaitable
import inspect
import json
import re
from typing import Any, cast
from crewai.agents.agent_adapters.base_tool_adapter import BaseToolAdapter
@@ -17,6 +16,7 @@ from crewai.agents.agent_adapters.openai_agents.protocols import (
)
from crewai.tools import BaseTool
from crewai.utilities.import_utils import require
from crewai.utilities.string_utils import sanitize_tool_name
agents_module = cast(
@@ -78,18 +78,6 @@ class OpenAIAgentToolAdapter(BaseToolAdapter):
if not tools:
return []
def sanitize_tool_name(name: str) -> str:
"""Convert tool name to match OpenAI's required pattern.
Args:
name: Original tool name.
Returns:
Sanitized tool name matching OpenAI requirements.
"""
return re.sub(r"[^a-zA-Z0-9_-]", "_", name).lower()
def create_tool_wrapper(tool: BaseTool) -> Any:
"""Create a wrapper function that handles the OpenAI function tool interface.

View File

@@ -10,6 +10,7 @@ from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import sanitize_tool_name
if TYPE_CHECKING:
@@ -36,7 +37,7 @@ class CrewAgentExecutorMixin:
self.crew
and self.agent
and self.task
and "Action: Delegate work to coworker" not in output.text
and f"Action: {sanitize_tool_name('Delegate work to coworker')}" not in output.text
):
try:
if (

View File

@@ -30,6 +30,7 @@ from crewai.hooks.llm_hooks import (
)
from crewai.utilities.agent_utils import (
aget_llm_response,
convert_tools_to_openai_schema,
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
@@ -41,10 +42,12 @@ from crewai.utilities.agent_utils import (
has_reached_max_iterations,
is_context_length_exceeded,
process_llm_response,
track_delegation_if_needed,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.tool_utils import (
aexecute_tool_and_check_finality,
execute_tool_and_check_finality,
@@ -215,6 +218,33 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _invoke_loop(self) -> AgentFinish:
"""Execute agent loop until completion.
Checks if the LLM supports native function calling and uses that
approach if available, otherwise falls back to the ReAct text pattern.
Returns:
Final answer from the agent.
"""
# Check if model supports native function calling
use_native_tools = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.original_tools
)
if use_native_tools:
return self._invoke_loop_native_tools()
# Fall back to ReAct text-based pattern
return self._invoke_loop_react()
def _invoke_loop_react(self) -> AgentFinish:
"""Execute agent loop using ReAct text-based pattern.
This is the traditional approach where tool definitions are embedded
in the prompt and the LLM outputs Action/Action Input text that is
parsed to execute tools.
Returns:
Final answer from the agent.
"""
@@ -244,6 +274,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
response_model=self.response_model,
executor_context=self,
)
# breakpoint()
if self.response_model is not None:
try:
self.response_model.model_validate_json(answer)
@@ -333,6 +364,430 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
def _invoke_loop_native_tools(self) -> AgentFinish:
"""Execute agent loop using native function calling.
This method uses the LLM's native tool/function calling capability
instead of the text-based ReAct pattern. The LLM directly returns
structured tool calls which are executed and results fed back.
Returns:
Final answer from the agent.
"""
# Convert tools to OpenAI schema format
if not self.original_tools:
# No tools available, fall back to simple LLM call
return self._invoke_loop_native_no_tools()
openai_tools, available_functions = convert_tools_to_openai_schema(
self.original_tools
)
while True:
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
self._show_logs(formatted_answer)
return formatted_answer
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution
# via _handle_native_tool_calls to properly manage message history.
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
tools=openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
# Check if the response is a list of tool calls
if (
isinstance(answer, list)
and answer
and self._is_tool_call_list(answer)
):
# Handle tool calls - execute tools and add results to messages
tool_finish = self._handle_native_tool_calls(
answer, available_functions
)
# If tool has result_as_answer=True, return immediately
if tool_finish is not None:
return tool_finish
# Continue loop to let LLM analyze results and decide next steps
continue
# Text or other response - handle as potential final answer
if isinstance(answer, str):
# Text response - this is the final answer
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
def _invoke_loop_native_no_tools(self) -> AgentFinish:
"""Execute a simple LLM call when no tools are available.
Returns:
Final answer from the agent.
"""
enforce_rpm_limit(self.request_within_rpm_limit)
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# OpenAI-style
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Anthropic-style (object with attributes)
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Bedrock-style (dict with name and input keys)
if (
isinstance(first_item, dict)
and "name" in first_item
and "input" in first_item
):
return True
# Gemini-style
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
def _handle_native_tool_calls(
self,
tool_calls: list[Any],
available_functions: dict[str, Callable[..., Any]],
) -> AgentFinish | None:
"""Handle a single native tool call from the LLM.
Executes only the FIRST tool call and appends the result to message history.
This enables sequential tool execution with reflection after each tool,
allowing the LLM to reason about results before deciding on next steps.
Args:
tool_calls: List of tool calls from the LLM (only first is processed).
available_functions: Dict mapping function names to callables.
Returns:
AgentFinish if tool has result_as_answer=True, None otherwise.
"""
from datetime import datetime
import json
from crewai.events import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
if not tool_calls:
return None
# Only process the FIRST tool call for sequential execution with reflection
tool_call = tool_calls[0]
# Extract tool call info - handle OpenAI-style, Anthropic-style, and Gemini-style
if hasattr(tool_call, "function"):
# OpenAI-style: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = sanitize_tool_name(tool_call.function.name)
func_args = tool_call.function.arguments
elif hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini-style: has .function_call.name and .function_call.args
call_id = f"call_{id(tool_call)}"
func_name = sanitize_tool_name(tool_call.function_call.name)
func_args = (
dict(tool_call.function_call.args)
if tool_call.function_call.args
else {}
)
elif hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = sanitize_tool_name(tool_call.name)
func_args = tool_call.input # Already a dict in Anthropic
elif isinstance(tool_call, dict):
# Support OpenAI "id", Bedrock "toolUseId", or generate one
call_id = (
tool_call.get("id")
or tool_call.get("toolUseId")
or f"call_{id(tool_call)}"
)
func_info = tool_call.get("function", {})
func_name = sanitize_tool_name(
func_info.get("name", "") or tool_call.get("name", "")
)
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
else:
return None
# Append assistant message with single tool call
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
],
}
self.messages.append(assistant_message)
# Parse arguments for the single tool call
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
# Find original tool by matching sanitized name (needed for cache_function and result_as_answer)
original_tool = None
for tool in self.original_tools or []:
if sanitize_tool_name(tool.name) == func_name:
original_tool = tool
break
# Check if tool has reached max usage count
max_usage_reached = False
if original_tool:
if (
hasattr(original_tool, "max_usage_count")
and original_tool.max_usage_count is not None
and original_tool.current_usage_count >= original_tool.max_usage_count
):
max_usage_reached = True
# Check cache before executing
from_cache = False
input_str = json.dumps(args_dict) if args_dict else ""
if self.tools_handler and self.tools_handler.cache:
cached_result = self.tools_handler.cache.read(
tool=func_name, input=input_str
)
if cached_result is not None:
result = (
str(cached_result)
if not isinstance(cached_result, str)
else cached_result
)
from_cache = True
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
),
)
track_delegation_if_needed(func_name, args_dict, self.task)
# Execute the tool (only if not cached and not at max usage)
if not from_cache and not max_usage_reached:
result = "Tool not found"
if func_name in available_functions:
try:
tool_func = available_functions[func_name]
raw_result = tool_func(**args_dict)
# Add to cache after successful execution (before string conversion)
if self.tools_handler and self.tools_handler.cache:
should_cache = True
if (
original_tool
and hasattr(original_tool, "cache_function")
and original_tool.cache_function
):
should_cache = original_tool.cache_function(
args_dict, raw_result
)
if should_cache:
self.tools_handler.cache.add(
tool=func_name, input=input_str, output=raw_result
)
# Convert to string for message
result = (
str(raw_result)
if not isinstance(raw_result, str)
else raw_result
)
except Exception as e:
result = f"Error executing tool: {e}"
if self.task:
self.task.increment_tools_errors()
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
error=e,
),
)
elif max_usage_reached:
# Return error message when max usage limit is reached
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"name": func_name,
"content": result,
}
self.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
cache_info = " (from cache)" if from_cache else ""
self._printer.print(
content=f"Tool {func_name} executed with result{cache_info}: {result[:200]}...",
color="green",
)
if (
original_tool
and hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer
):
# Return immediately with tool result as final answer
return AgentFinish(
thought="Tool result is the final answer",
output=result,
text=result,
)
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.messages.append(reasoning_message)
return None
async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute the agent asynchronously with given inputs.
@@ -382,6 +837,29 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
async def _ainvoke_loop(self) -> AgentFinish:
"""Execute agent loop asynchronously until completion.
Checks if the LLM supports native function calling and uses that
approach if available, otherwise falls back to the ReAct text pattern.
Returns:
Final answer from the agent.
"""
# Check if model supports native function calling
use_native_tools = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.original_tools
)
if use_native_tools:
return await self._ainvoke_loop_native_tools()
# Fall back to ReAct text-based pattern
return await self._ainvoke_loop_react()
async def _ainvoke_loop_react(self) -> AgentFinish:
"""Execute agent loop asynchronously using ReAct text-based pattern.
Returns:
Final answer from the agent.
"""
@@ -495,6 +973,140 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
async def _ainvoke_loop_native_tools(self) -> AgentFinish:
"""Execute agent loop asynchronously using native function calling.
This method uses the LLM's native tool/function calling capability
instead of the text-based ReAct pattern.
Returns:
Final answer from the agent.
"""
# Convert tools to OpenAI schema format
if not self.original_tools:
return await self._ainvoke_loop_native_no_tools()
openai_tools, available_functions = convert_tools_to_openai_schema(
self.original_tools
)
while True:
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
self._show_logs(formatted_answer)
return formatted_answer
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution
# via _handle_native_tool_calls to properly manage message history.
answer = await aget_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
tools=openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
# Check if the response is a list of tool calls
if (
isinstance(answer, list)
and answer
and self._is_tool_call_list(answer)
):
# Handle tool calls - execute tools and add results to messages
tool_finish = self._handle_native_tool_calls(
answer, available_functions
)
# If tool has result_as_answer=True, return immediately
if tool_finish is not None:
return tool_finish
# Continue loop to let LLM analyze results and decide next steps
continue
# Text or other response - handle as potential final answer
if isinstance(answer, str):
# Text response - this is the final answer
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
async def _ainvoke_loop_native_no_tools(self) -> AgentFinish:
"""Execute a simple async LLM call when no tools are available.
Returns:
Final answer from the agent.
"""
enforce_rpm_limit(self.request_within_rpm_limit)
answer = await aget_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> AgentAction | AgentFinish:

View File

@@ -104,6 +104,7 @@ from crewai.utilities.streaming import (
signal_end,
signal_error,
)
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -1241,10 +1242,14 @@ class Crew(FlowTrackable, BaseModel):
return existing_tools
# Create mapping of tool names to new tools
new_tool_map = {tool.name: tool for tool in new_tools}
new_tool_map = {sanitize_tool_name(tool.name): tool for tool in new_tools}
# Remove any existing tools that will be replaced
tools = [tool for tool in existing_tools if tool.name not in new_tool_map]
tools = [
tool
for tool in existing_tools
if sanitize_tool_name(tool.name) not in new_tool_map
]
# Add all new tools
tools.extend(new_tools)

View File

@@ -378,6 +378,12 @@ class EventListener(BaseEventListener):
self.formatter.handle_llm_tool_usage_finished(
event.tool_name,
)
else:
self.formatter.handle_tool_usage_finished(
event.tool_name,
event.output,
getattr(event, "run_attempts", None),
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source: Any, event: ToolUsageErrorEvent) -> None:

View File

@@ -366,6 +366,32 @@ To enable tracing, do any one of these:
self.print_panel(content, f"🔧 Tool Execution Started (#{iteration})", "yellow")
def handle_tool_usage_finished(
self,
tool_name: str,
output: str,
run_attempts: int | None = None,
) -> None:
"""Handle tool usage finished event with panel display."""
if not self.verbose:
return
iteration = self.tool_usage_counts.get(tool_name, 1)
content = Text()
content.append("Tool Completed\n", style="green bold")
content.append("Tool: ", style="white")
content.append(f"{tool_name}\n", style="green bold")
if output:
content.append("Output: ", style="white")
content.append(f"{output}\n", style="green")
self.print_panel(
content, f"✅ Tool Execution Completed (#{iteration})", "green"
)
def handle_tool_usage_error(
self,
tool_name: str,

View File

@@ -1,6 +1,8 @@
from __future__ import annotations
from collections.abc import Callable, Coroutine
from datetime import datetime
import json
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
from uuid import uuid4
@@ -17,17 +19,27 @@ from crewai.agents.parser import (
OutputParserError,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled_in_context,
)
from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.flow.flow import Flow, listen, or_, router, start
from crewai.hooks.llm_hooks import (
get_after_llm_call_hooks,
get_before_llm_call_hooks,
)
from crewai.utilities.agent_utils import (
convert_tools_to_openai_schema,
enforce_rpm_limit,
extract_tool_call_info,
format_message_for_llm,
get_llm_response,
handle_agent_action_core,
@@ -39,10 +51,12 @@ from crewai.utilities.agent_utils import (
is_context_length_exceeded,
is_inside_event_loop,
process_llm_response,
track_delegation_if_needed,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.types import LLMMessage
@@ -72,6 +86,8 @@ class AgentReActState(BaseModel):
current_answer: AgentAction | AgentFinish | None = Field(default=None)
is_finished: bool = Field(default=False)
ask_for_human_input: bool = Field(default=False)
use_native_tools: bool = Field(default=False)
pending_tool_calls: list[Any] = Field(default_factory=list)
class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
@@ -193,14 +209,73 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
Only the instance that actually executes via invoke() will emit events.
"""
if not self._flow_initialized:
current_tracing = is_tracing_enabled_in_context()
# Now call Flow's __init__ which will replace self._state
# with Flow's managed state. Suppress flow events since this is
# an agent executor, not a user-facing flow.
super().__init__(
suppress_flow_events=True,
tracing=current_tracing if current_tracing else None,
)
self._flow_initialized = True
def _check_native_tool_support(self) -> bool:
"""Check if LLM supports native function calling.
Returns:
True if the LLM supports native function calling and tools are available.
"""
return (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and bool(self.original_tools)
)
def _setup_native_tools(self) -> None:
"""Convert tools to OpenAI schema format for native function calling."""
if self.original_tools:
self._openai_tools, self._available_functions = (
convert_tools_to_openai_schema(self.original_tools)
)
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# Check for OpenAI-style tool call structure
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Check for Anthropic-style tool call structure (ToolUseBlock)
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Check for Bedrock-style tool call structure (dict with name and input keys)
if (
isinstance(first_item, dict)
and "name" in first_item
and "input" in first_item
):
return True
# Check for Gemini-style function call (Part with function_call)
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
@property
def use_stop_words(self) -> bool:
"""Check to determine if stop words are being used.
@@ -233,6 +308,11 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
def initialize_reasoning(self) -> Literal["initialized"]:
"""Initialize the reasoning flow and emit agent start logs."""
self._show_start_logs()
# Check for native tool support on first iteration
if self.state.iterations == 0:
self.state.use_native_tools = self._check_native_tool_support()
if self.state.use_native_tools:
self._setup_native_tools()
return "initialized"
@listen("force_final_answer")
@@ -274,6 +354,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
# Parse the LLM response
formatted_answer = process_llm_response(answer, self.use_stop_words)
self.state.current_answer = formatted_answer
if "Final Answer:" in answer and isinstance(formatted_answer, AgentAction):
@@ -307,6 +388,79 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
handle_unknown_error(self._printer, e)
raise
@listen("continue_reasoning_native")
def call_llm_native_tools(
self,
) -> Literal["native_tool_calls", "native_finished", "context_error"]:
"""Execute LLM call with native function calling.
Always calls the LLM so it can read reflection prompts and decide
whether to provide a final answer or request more tools.
Returns routing decision based on whether tool calls or final answer.
"""
try:
# Clear pending tools - LLM will decide what to do next after reading
# the reflection prompt. It can either:
# 1. Return a final answer (string) if it has enough info
# 2. Return tool calls (possibly same ones, or different ones)
self.state.pending_tool_calls.clear()
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
answer = get_llm_response(
llm=self.llm,
messages=list(self.state.messages),
callbacks=self.callbacks,
printer=self._printer,
tools=self._openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=None,
executor_context=self,
)
# Check if the response is a list of tool calls
if isinstance(answer, list) and answer and self._is_tool_call_list(answer):
# Store tool calls for sequential processing
self.state.pending_tool_calls = list(answer)
return "native_tool_calls"
# Text response - this is the final answer
if isinstance(answer, str):
self.state.current_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(answer)
return "native_finished"
# Unexpected response type, treat as final answer
self.state.current_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(str(answer))
return "native_finished"
except Exception as e:
if is_context_length_exceeded(e):
self._last_context_error = e
return "context_error"
if e.__class__.__module__.startswith("litellm"):
raise e
handle_unknown_error(self._printer, e)
raise
@router(call_llm_and_parse)
def route_by_answer_type(self) -> Literal["execute_tool", "agent_finished"]:
"""Route based on whether answer is AgentAction or AgentFinish."""
@@ -317,6 +471,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
@listen("execute_tool")
def execute_tool_action(self) -> Literal["tool_completed", "tool_result_is_final"]:
"""Execute the tool action and handle the result."""
try:
action = cast(AgentAction, self.state.current_answer)
@@ -362,6 +517,14 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.is_finished = True
return "tool_result_is_final"
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
return "tool_completed"
except Exception as e:
@@ -371,6 +534,248 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._console.print(error_text)
raise
@listen("native_tool_calls")
def execute_native_tool(
self,
) -> Literal["native_tool_completed", "tool_result_is_final"]:
"""Execute native tool calls in a batch.
Processes all tools from pending_tool_calls, executes them,
and appends results to the conversation history.
Returns:
"native_tool_completed" normally, or "tool_result_is_final" if
a tool with result_as_answer=True was executed.
"""
if not self.state.pending_tool_calls:
return "native_tool_completed"
# Group all tool calls into a single assistant message
tool_calls_to_report = []
for tool_call in self.state.pending_tool_calls:
info = extract_tool_call_info(tool_call)
if not info:
continue
call_id, func_name, func_args = info
tool_calls_to_report.append(
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
)
if tool_calls_to_report:
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": tool_calls_to_report,
}
self.state.messages.append(assistant_message)
# Now execute each tool
while self.state.pending_tool_calls:
tool_call = self.state.pending_tool_calls.pop(0)
info = extract_tool_call_info(tool_call)
if not info:
continue
call_id, func_name, func_args = info
# Parse arguments
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Get agent_key for event tracking
agent_key = (
getattr(self.agent, "key", "unknown") if self.agent else "unknown"
)
# Find original tool by matching sanitized name (needed for cache_function and result_as_answer)
original_tool = None
for tool in self.original_tools or []:
if sanitize_tool_name(tool.name) == func_name:
original_tool = tool
break
# Check if tool has reached max usage count
max_usage_reached = False
if original_tool:
if (
hasattr(original_tool, "max_usage_count")
and original_tool.max_usage_count is not None
and original_tool.current_usage_count
>= original_tool.max_usage_count
):
max_usage_reached = True
# Check cache before executing
from_cache = False
input_str = json.dumps(args_dict) if args_dict else ""
if self.tools_handler and self.tools_handler.cache:
cached_result = self.tools_handler.cache.read(
tool=func_name, input=input_str
)
if cached_result is not None:
result = (
str(cached_result)
if not isinstance(cached_result, str)
else cached_result
)
from_cache = True
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
),
)
track_delegation_if_needed(func_name, args_dict, self.task)
# Execute the tool (only if not cached and not at max usage)
if not from_cache and not max_usage_reached:
result = "Tool not found"
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
raw_result = tool_func(**args_dict)
# Add to cache after successful execution (before string conversion)
if self.tools_handler and self.tools_handler.cache:
should_cache = True
if (
original_tool
and hasattr(original_tool, "cache_function")
and original_tool.cache_function
):
should_cache = original_tool.cache_function(
args_dict, raw_result
)
if should_cache:
self.tools_handler.cache.add(
tool=func_name, input=input_str, output=raw_result
)
# Convert to string for message
result = (
str(raw_result)
if not isinstance(raw_result, str)
else raw_result
)
except Exception as e:
result = f"Error executing tool: {e}"
if self.task:
self.task.increment_tools_errors()
# Emit tool usage error event
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
error=e,
),
)
elif max_usage_reached:
# Return error message when max usage limit is reached
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"name": func_name,
"content": result,
}
self.state.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
cache_info = " (from cache)" if from_cache else ""
self._printer.print(
content=f"Tool {func_name} executed with result{cache_info}: {result[:200]}...",
color="green",
)
if (
original_tool
and hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer
):
# Set the result as the final answer
self.state.current_answer = AgentFinish(
thought="Tool result is the final answer",
output=result,
text=result,
)
self.state.is_finished = True
return "tool_result_is_final"
# Add reflection prompt once after all tools in the batch
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
return "native_tool_completed"
def _extract_tool_name(self, tool_call: Any) -> str:
"""Extract tool name from various tool call formats."""
if hasattr(tool_call, "function"):
return sanitize_tool_name(tool_call.function.name)
if hasattr(tool_call, "function_call") and tool_call.function_call:
return sanitize_tool_name(tool_call.function_call.name)
if hasattr(tool_call, "name"):
return sanitize_tool_name(tool_call.name)
if isinstance(tool_call, dict):
func_info = tool_call.get("function", {})
return sanitize_tool_name(func_info.get("name", "") or tool_call.get("name", "unknown"))
return "unknown"
@router(execute_native_tool)
def increment_native_and_continue(self) -> Literal["initialized"]:
"""Increment iteration counter after native tool execution."""
self.state.iterations += 1
return "initialized"
@listen("initialized")
def continue_iteration(self) -> Literal["check_iteration"]:
"""Bridge listener that connects iteration loop back to iteration check."""
@@ -379,10 +784,14 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
@router(or_(initialize_reasoning, continue_iteration))
def check_max_iterations(
self,
) -> Literal["force_final_answer", "continue_reasoning"]:
) -> Literal[
"force_final_answer", "continue_reasoning", "continue_reasoning_native"
]:
"""Check if max iterations reached before proceeding with reasoning."""
if has_reached_max_iterations(self.state.iterations, self.max_iter):
return "force_final_answer"
if self.state.use_native_tools:
return "continue_reasoning_native"
return "continue_reasoning"
@router(execute_tool_action)
@@ -391,7 +800,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.iterations += 1
return "initialized"
@listen(or_("agent_finished", "tool_result_is_final"))
@listen(or_("agent_finished", "tool_result_is_final", "native_finished"))
def finalize(self) -> Literal["completed", "skipped"]:
"""Finalize execution and emit completion logs."""
if self.state.current_answer is None:
@@ -489,6 +898,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
self.state.use_native_tools = False
self.state.pending_tool_calls = []
if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)
@@ -569,6 +980,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
self.state.use_native_tools = False
self.state.pending_tool_calls = []
if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)

View File

@@ -11,6 +11,7 @@ from crewai.experimental.evaluation.base_evaluator import (
)
from crewai.experimental.evaluation.json_parser import extract_json_from_llm_response
from crewai.task import Task
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.types import LLMMessage
@@ -52,7 +53,9 @@ class ToolSelectionEvaluator(BaseEvaluator):
available_tools_info = ""
if agent.tools:
for tool in agent.tools:
available_tools_info += f"- {tool.name}: {tool.description}\n"
available_tools_info += (
f"- {sanitize_tool_name(tool.name)}: {tool.description}\n"
)
else:
available_tools_info = "No tools available"

View File

@@ -449,7 +449,7 @@ class StateProxy(Generic[T]):
"""Return the underlying state object."""
return cast(T, object.__getattribute__(self, "_proxy_state"))
def model_dump(self) -> dict[str, Any]:
def model_dump(self, *args: Any, **kwargs: Any) -> dict[str, Any]:
"""Return state as a dictionary.
Works for both dict and BaseModel underlying states.
@@ -457,7 +457,7 @@ class StateProxy(Generic[T]):
state = object.__getattribute__(self, "_proxy_state")
if isinstance(state, dict):
return state
result: dict[str, Any] = state.model_dump()
result: dict[str, Any] = state.model_dump(*args, **kwargs)
return result

View File

@@ -50,6 +50,7 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
)
from crewai.utilities.logger_utils import suppress_warnings
from crewai.utilities.string_utils import sanitize_tool_name
if TYPE_CHECKING:
@@ -931,7 +932,6 @@ class LLM(BaseLLM):
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
if not tool_calls or not available_functions:
if response_model and self.is_litellm:
instructor_instance = InternalInstructor(
content=full_response,
@@ -1144,8 +1144,12 @@ class LLM(BaseLLM):
if response_model:
params["response_model"] = response_model
response = litellm.completion(**params)
if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage:
if (
hasattr(response, "usage")
and not isinstance(response.usage, type)
and response.usage
):
usage_info = response.usage
self._track_token_usage_internal(usage_info)
@@ -1199,16 +1203,19 @@ class LLM(BaseLLM):
)
return text_response
# --- 6) If there is no text response, no available functions, but there are tool calls, return the tool calls
if tool_calls and not available_functions and not text_response:
# --- 6) If there are tool calls but no available functions, return the tool calls
# This allows the caller (e.g., executor) to handle tool execution
if tool_calls and not available_functions:
return tool_calls
# --- 7) Handle tool calls if present
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# --- 7) Handle tool calls if present (execute when available_functions provided)
if tool_calls and available_functions:
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# --- 8) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(
response=text_response,
@@ -1273,7 +1280,11 @@ class LLM(BaseLLM):
params["response_model"] = response_model
response = await litellm.acompletion(**params)
if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage:
if (
hasattr(response, "usage")
and not isinstance(response.usage, type)
and response.usage
):
usage_info = response.usage
self._track_token_usage_internal(usage_info)
@@ -1321,14 +1332,18 @@ class LLM(BaseLLM):
)
return text_response
if tool_calls and not available_functions and not text_response:
# If there are tool calls but no available functions, return the tool calls
# This allows the caller (e.g., executor) to handle tool execution
if tool_calls and not available_functions:
return tool_calls
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# Handle tool calls if present (execute when available_functions provided)
if tool_calls and available_functions:
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
self._handle_emit_call_events(
response=text_response,
@@ -1363,7 +1378,7 @@ class LLM(BaseLLM):
"""
full_response = ""
chunk_count = 0
usage_info = None
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict(
@@ -1526,7 +1541,7 @@ class LLM(BaseLLM):
# --- 2) Extract function name from first tool call
tool_call = tool_calls[0]
function_name = tool_call.function.name
function_name = sanitize_tool_name(tool_call.function.name)
function_args = {} # Initialize to empty dict to avoid unbound variable
# --- 3) Check if function is available

View File

@@ -292,14 +292,16 @@ class BaseLLM(ABC):
from_agent: Agent | None = None,
) -> None:
"""Emit LLM call started event."""
from crewai.utilities.serialization import to_serializable
if not hasattr(crewai_event_bus, "emit"):
raise ValueError("crewai_event_bus does not have an emit method") from None
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
messages=messages,
tools=tools,
messages=to_serializable(messages),
tools=to_serializable(tools),
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
@@ -317,11 +319,13 @@ class BaseLLM(ABC):
messages: str | list[LLMMessage] | None = None,
) -> None:
"""Emit LLM call completed event."""
from crewai.utilities.serialization import to_serializable
crewai_event_bus.emit(
self,
event=LLMCallCompletedEvent(
messages=messages,
response=response,
messages=to_serializable(messages),
response=to_serializable(response),
call_type=call_type,
from_task=from_task,
from_agent=from_agent,
@@ -446,7 +450,7 @@ class BaseLLM(ABC):
from_agent=from_agent,
)
return str(result)
return result
except Exception as e:
error_msg = f"Error executing function '{function_name}': {e!s}"

View File

@@ -418,6 +418,7 @@ class AnthropicCompletion(BaseLLM):
- System messages are separate from conversation messages
- Messages must alternate between user and assistant
- First message must be from user
- Tool results must be in user messages with tool_result content blocks
- When thinking is enabled, assistant messages must start with thinking blocks
Args:
@@ -431,6 +432,7 @@ class AnthropicCompletion(BaseLLM):
formatted_messages: list[LLMMessage] = []
system_message: str | None = None
pending_tool_results: list[dict[str, Any]] = []
for message in base_formatted:
role = message.get("role")
@@ -441,16 +443,47 @@ class AnthropicCompletion(BaseLLM):
system_message += f"\n\n{content}"
else:
system_message = cast(str, content)
else:
role_str = role if role is not None else "user"
elif role == "tool":
tool_call_id = message.get("tool_call_id", "")
if not tool_call_id:
raise ValueError("Tool message missing required tool_call_id")
tool_result = {
"type": "tool_result",
"tool_use_id": tool_call_id,
"content": content if content else "",
}
pending_tool_results.append(tool_result)
elif role == "assistant":
# First, flush any pending tool results as a user message
if pending_tool_results:
formatted_messages.append(
{"role": "user", "content": pending_tool_results}
)
pending_tool_results = []
if isinstance(content, list):
formatted_messages.append({"role": role_str, "content": content})
elif (
role_str == "assistant"
and self.thinking
and self.previous_thinking_blocks
):
# Handle assistant message with tool_calls (convert to Anthropic format)
tool_calls = message.get("tool_calls", [])
if tool_calls:
assistant_content: list[dict[str, Any]] = []
for tc in tool_calls:
if isinstance(tc, dict):
func = tc.get("function", {})
tool_use = {
"type": "tool_use",
"id": tc.get("id", ""),
"name": func.get("name", ""),
"input": json.loads(func.get("arguments", "{}"))
if isinstance(func.get("arguments"), str)
else func.get("arguments", {}),
}
assistant_content.append(tool_use)
if assistant_content:
formatted_messages.append(
{"role": "assistant", "content": assistant_content}
)
elif isinstance(content, list):
formatted_messages.append({"role": "assistant", "content": content})
elif self.thinking and self.previous_thinking_blocks:
structured_content = cast(
list[dict[str, Any]],
[
@@ -459,14 +492,34 @@ class AnthropicCompletion(BaseLLM):
],
)
formatted_messages.append(
LLMMessage(role=role_str, content=structured_content)
LLMMessage(role="assistant", content=structured_content)
)
else:
content_str = content if content is not None else ""
formatted_messages.append(
LLMMessage(role="assistant", content=content_str)
)
else:
# User message - first flush any pending tool results
if pending_tool_results:
formatted_messages.append(
{"role": "user", "content": pending_tool_results}
)
pending_tool_results = []
role_str = role if role is not None else "user"
if isinstance(content, list):
formatted_messages.append({"role": role_str, "content": content})
else:
content_str = content if content is not None else ""
formatted_messages.append(
LLMMessage(role=role_str, content=content_str)
)
# Flush any remaining pending tool results
if pending_tool_results:
formatted_messages.append({"role": "user", "content": pending_tool_results})
# Ensure first message is from user (Anthropic requirement)
if not formatted_messages:
# If no messages, add a default user message
@@ -526,13 +579,26 @@ class AnthropicCompletion(BaseLLM):
return structured_json
# Check if Claude wants to use tools
if response.content and available_functions:
if response.content:
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]
if tool_uses:
# Handle tool use conversation flow
# If no available_functions, return tool calls for executor to handle
# This allows the executor to manage tool execution with proper
# message history and post-tool reasoning prompts
if not available_functions:
self._emit_call_completed_event(
response=list(tool_uses),
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return list(tool_uses)
# Handle tool use conversation flow internally
return self._handle_tool_use_conversation(
response,
tool_uses,
@@ -696,7 +762,7 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if final_message.content and available_functions:
if final_message.content:
tool_uses = [
block
for block in final_message.content
@@ -704,7 +770,11 @@ class AnthropicCompletion(BaseLLM):
]
if tool_uses:
# Handle tool use conversation flow
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
# Handle tool use conversation flow internally
return self._handle_tool_use_conversation(
final_message,
tool_uses,
@@ -933,12 +1003,23 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if response.content and available_functions:
if response.content:
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
self._emit_call_completed_event(
response=list(tool_uses),
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return list(tool_uses)
return await self._ahandle_tool_use_conversation(
response,
tool_uses,
@@ -1079,7 +1160,7 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if final_message.content and available_functions:
if final_message.content:
tool_uses = [
block
for block in final_message.content
@@ -1087,6 +1168,10 @@ class AnthropicCompletion(BaseLLM):
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
return await self._ahandle_tool_use_conversation(
final_message,
tool_uses,

View File

@@ -514,10 +514,32 @@ class AzureCompletion(BaseLLM):
for message in base_formatted:
role = message.get("role", "user") # Default to user if no role
content = message.get("content", "")
# Handle None content - Azure requires string content
content = message.get("content") or ""
# Azure AI Inference requires both 'role' and 'content'
azure_messages.append({"role": role, "content": content})
if role == "tool":
tool_call_id = message.get("tool_call_id", "")
if not tool_call_id:
raise ValueError("Tool message missing required tool_call_id")
azure_messages.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": content,
}
)
# Handle assistant messages with tool_calls
elif role == "assistant" and message.get("tool_calls"):
tool_calls = message.get("tool_calls", [])
azure_msg: LLMMessage = {
"role": "assistant",
"content": content, # Already defaulted to "" above
"tool_calls": tool_calls,
}
azure_messages.append(azure_msg)
else:
# Azure AI Inference requires both 'role' and 'content'
azure_messages.append({"role": role, "content": content})
return azure_messages
@@ -604,6 +626,18 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
)
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
self._emit_call_completed_event(
response=list(message.tool_calls),
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return list(message.tool_calls)
# Handle tool calls
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0] # Handle first tool call
@@ -775,6 +809,29 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
)
# If there are tool_calls but no available_functions, return them
# in OpenAI-compatible format for executor to handle
if tool_calls and not available_functions:
formatted_tool_calls = [
{
"id": call_data.get("id", f"call_{idx}"),
"type": "function",
"function": {
"name": call_data["name"],
"arguments": call_data["arguments"],
},
}
for idx, call_data in tool_calls.items()
]
self._emit_call_completed_event(
response=formatted_tool_calls,
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return formatted_tool_calls
# Handle completed tool calls
if tool_calls and available_functions:
for call_data in tool_calls.values():

View File

@@ -330,7 +330,8 @@ class BedrockCompletion(BaseLLM):
cast(object, [{"text": system_message}]),
)
# Add tool config if present
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
@@ -339,6 +340,16 @@ class BedrockCompletion(BaseLLM):
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
# Add optional advanced features if configured
if self.guardrail_config:
@@ -444,6 +455,8 @@ class BedrockCompletion(BaseLLM):
cast(object, [{"text": system_message}]),
)
# Add tool config if present or if messages contain tool content
# Bedrock requires toolConfig when messages have toolUse/toolResult
if tools:
tool_config: ToolConfigurationTypeDef = {
"tools": cast(
@@ -452,6 +465,16 @@ class BedrockCompletion(BaseLLM):
)
}
body["toolConfig"] = tool_config
elif self._messages_contain_tool_content(formatted_messages):
# Create minimal toolConfig from tool history in messages
tools_from_history = self._extract_tools_from_message_history(
formatted_messages
)
if tools_from_history:
body["toolConfig"] = cast(
"ToolConfigurationTypeDef",
cast(object, {"tools": tools_from_history}),
)
if self.guardrail_config:
guardrail_config: GuardrailConfigurationTypeDef = cast(
@@ -546,6 +569,18 @@ class BedrockCompletion(BaseLLM):
"I apologize, but I received an empty response. Please try again."
)
# If there are tool uses but no available_functions, return them for the executor to handle
tool_uses = [block["toolUse"] for block in content if "toolUse" in block]
if tool_uses and not available_functions:
self._emit_call_completed_event(
response=tool_uses,
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
)
return tool_uses
# Process content blocks and handle tool use correctly
text_content = ""
@@ -935,6 +970,18 @@ class BedrockCompletion(BaseLLM):
"I apologize, but I received an empty response. Please try again."
)
# If there are tool uses but no available_functions, return them for the executor to handle
tool_uses = [block["toolUse"] for block in content if "toolUse" in block]
if tool_uses and not available_functions:
self._emit_call_completed_event(
response=tool_uses,
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages,
)
return tool_uses
text_content = ""
for content_block in content:
@@ -1266,6 +1313,8 @@ class BedrockCompletion(BaseLLM):
for message in formatted_messages:
role = message.get("role")
content = message.get("content", "")
tool_calls = message.get("tool_calls")
tool_call_id = message.get("tool_call_id")
if role == "system":
# Extract system message - Converse API handles it separately
@@ -1273,9 +1322,49 @@ class BedrockCompletion(BaseLLM):
system_message += f"\n\n{content}"
else:
system_message = cast(str, content)
elif role == "assistant" and tool_calls:
# Convert OpenAI-style tool_calls to Bedrock toolUse format
bedrock_content = []
for tc in tool_calls:
func = tc.get("function", {})
tool_use_block = {
"toolUse": {
"toolUseId": tc.get("id", f"call_{id(tc)}"),
"name": func.get("name", ""),
"input": func.get("arguments", {})
if isinstance(func.get("arguments"), dict)
else json.loads(func.get("arguments", "{}") or "{}"),
}
}
bedrock_content.append(tool_use_block)
converse_messages.append(
{"role": "assistant", "content": bedrock_content}
)
elif role == "tool":
if not tool_call_id:
raise ValueError("Tool message missing required tool_call_id")
converse_messages.append(
{
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": tool_call_id,
"content": [
{"text": str(content) if content else ""}
],
}
}
],
}
)
else:
# Convert to Converse API format with proper content structure
converse_messages.append({"role": role, "content": [{"text": content}]})
# Ensure content is not None
text_content = content if content else ""
converse_messages.append(
{"role": role, "content": [{"text": text_content}]}
)
# CRITICAL: Handle model-specific conversation requirements
# Cohere and some other models require conversation to end with user message
@@ -1325,6 +1414,58 @@ class BedrockCompletion(BaseLLM):
return converse_messages, system_message
@staticmethod
def _messages_contain_tool_content(messages: list[LLMMessage]) -> bool:
"""Check if messages contain toolUse or toolResult content blocks.
Bedrock requires toolConfig when messages have tool-related content.
"""
for message in messages:
content = message.get("content", [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict):
if "toolUse" in block or "toolResult" in block:
return True
return False
@staticmethod
def _extract_tools_from_message_history(
messages: list[LLMMessage],
) -> list[dict[str, Any]]:
"""Extract tool definitions from toolUse blocks in message history.
When no tools are passed but messages contain toolUse, we need to
recreate a minimal toolConfig to satisfy Bedrock's API requirements.
"""
tools: list[dict[str, Any]] = []
seen_tool_names: set[str] = set()
for message in messages:
content = message.get("content", [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and "toolUse" in block:
tool_use = block["toolUse"]
tool_name = tool_use.get("name", "")
if tool_name and tool_name not in seen_tool_names:
seen_tool_names.add(tool_name)
# Create a minimal tool spec from the toolUse block
tool_spec: dict[str, Any] = {
"toolSpec": {
"name": tool_name,
"description": f"Tool: {tool_name}",
"inputSchema": {
"json": {
"type": "object",
"properties": {},
}
},
}
}
tools.append(tool_spec)
return tools
@staticmethod
def _format_tools_for_converse(
tools: list[dict[str, Any]],

View File

@@ -531,6 +531,53 @@ class GeminiCompletion(BaseLLM):
system_instruction += f"\n\n{text_content}"
else:
system_instruction = text_content
elif role == "tool":
tool_call_id = message.get("tool_call_id")
if not tool_call_id:
raise ValueError("Tool message missing required tool_call_id")
tool_name = message.get("name", "")
response_data: dict[str, Any]
try:
response_data = json.loads(text_content) if text_content else {}
except (json.JSONDecodeError, TypeError):
response_data = {"result": text_content}
function_response_part = types.Part.from_function_response(
name=tool_name, response=response_data
)
contents.append(
types.Content(role="user", parts=[function_response_part])
)
elif role == "assistant" and message.get("tool_calls"):
parts: list[types.Part] = []
if text_content:
parts.append(types.Part.from_text(text=text_content))
tool_calls: list[dict[str, Any]] = message.get("tool_calls") or []
for tool_call in tool_calls:
func: dict[str, Any] = tool_call.get("function") or {}
func_name: str = str(func.get("name") or "")
func_args_raw: str | dict[str, Any] = func.get("arguments") or {}
func_args: dict[str, Any]
if isinstance(func_args_raw, str):
try:
func_args = (
json.loads(func_args_raw) if func_args_raw else {}
)
except (json.JSONDecodeError, TypeError):
func_args = {}
else:
func_args = func_args_raw
parts.append(
types.Part.from_function_call(name=func_name, args=func_args)
)
contents.append(types.Content(role="model", parts=parts))
else:
# Convert role for Gemini (assistant -> model)
gemini_role = "model" if role == "assistant" else "user"
@@ -653,6 +700,24 @@ class GeminiCompletion(BaseLLM):
if response.candidates and (self.tools or available_functions):
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
# Collect function call parts
function_call_parts = [
part for part in candidate.content.parts if part.function_call
]
# If there are function calls but no available_functions,
# return them for the executor to handle (like OpenAI/Anthropic)
if function_call_parts and not available_functions:
self._emit_call_completed_event(
response=function_call_parts,
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=self._convert_contents_to_dict(contents),
)
return function_call_parts
# Otherwise execute the tools internally
for part in candidate.content.parts:
if part.function_call:
function_name = part.function_call.name
@@ -675,7 +740,7 @@ class GeminiCompletion(BaseLLM):
if result is not None:
return result
content = response.text or ""
content = self._extract_text_from_response(response)
content = self._apply_stop_words(content)
return self._finalize_completion_response(
@@ -767,7 +832,7 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str:
) -> str | list[dict[str, Any]]:
"""Finalize streaming response with usage tracking, function execution, and events.
Args:
@@ -785,6 +850,29 @@ class GeminiCompletion(BaseLLM):
"""
self._track_token_usage_internal(usage_data)
# If there are function calls but no available_functions,
# return them for the executor to handle
if function_calls and not available_functions:
formatted_function_calls = [
{
"id": call_data["id"],
"function": {
"name": call_data["name"],
"arguments": json.dumps(call_data["args"]),
},
"type": "function",
}
for call_data in function_calls.values()
]
self._emit_call_completed_event(
response=formatted_function_calls,
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=self._convert_contents_to_dict(contents),
)
return formatted_function_calls
# Handle completed function calls
if function_calls and available_functions:
for call_data in function_calls.values():
@@ -1035,6 +1123,35 @@ class GeminiCompletion(BaseLLM):
}
return {"total_tokens": 0}
@staticmethod
def _extract_text_from_response(response: GenerateContentResponse) -> str:
"""Extract text content from Gemini response without triggering warnings.
This method directly accesses the response parts to extract text content,
avoiding the warning that occurs when using response.text on responses
containing non-text parts (e.g., 'thought_signature' from thinking models).
Args:
response: The Gemini API response
Returns:
Concatenated text content from all text parts
"""
if not response.candidates:
return ""
candidate = response.candidates[0]
if not candidate.content or not candidate.content.parts:
return ""
text_parts = [
part.text
for part in candidate.content.parts
if hasattr(part, "text") and part.text
]
return "".join(text_parts)
@staticmethod
def _convert_contents_to_dict(
contents: list[types.Content],

View File

@@ -428,6 +428,19 @@ class OpenAICompletion(BaseLLM):
choice: Choice = response.choices[0]
message = choice.message
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
self._emit_call_completed_event(
response=list(message.tool_calls),
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return list(message.tool_calls)
# If there are tool_calls and available_functions, execute the tools
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0]
function_name = tool_call.function.name
@@ -725,6 +738,19 @@ class OpenAICompletion(BaseLLM):
choice: Choice = response.choices[0]
message = choice.message
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
self._emit_call_completed_event(
response=list(message.tool_calls),
call_type=LLMCallType.TOOL_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return list(message.tool_calls)
# If there are tool_calls and available_functions, execute the tools
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0]
function_name = tool_call.function.name

View File

@@ -2,16 +2,12 @@ import logging
import re
from typing import Any
from crewai.utilities.string_utils import sanitize_tool_name
def validate_function_name(name: str, provider: str = "LLM") -> str:
"""Validate function name according to common LLM provider requirements.
Most LLM providers (OpenAI, Gemini, Anthropic) have similar requirements:
- Must start with letter or underscore
- Only alphanumeric, underscore, dot, colon, dash allowed
- Maximum length of 64 characters
- Cannot be empty
Args:
name: The function name to validate
provider: The provider name for error messages
@@ -35,11 +31,10 @@ def validate_function_name(name: str, provider: str = "LLM") -> str:
f"{provider} function name '{name}' exceeds 64 character limit"
)
# Check for invalid characters (most providers support these)
if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_.\-:]*$", name):
if not re.match(r"^[a-z_][a-z0-9_]*$", name):
raise ValueError(
f"{provider} function name '{name}' contains invalid characters. "
f"Only letters, numbers, underscore, dot, colon, dash allowed"
f"Only lowercase letters, numbers, and underscores allowed"
)
return name
@@ -105,6 +100,18 @@ def log_tool_conversion(tool: dict[str, Any], provider: str) -> None:
logging.error(f"{provider}: Tool structure: {tool}")
def sanitize_function_name(name: str) -> str:
"""Sanitize function name for LLM provider compatibility.
Args:
name: Original function name
Returns:
Sanitized function name (lowercase, a-z0-9_ only, max 64 chars)
"""
return sanitize_tool_name(name)
def safe_tool_conversion(
tool: dict[str, Any], provider: str
) -> tuple[str, str, dict[str, Any]]:
@@ -127,7 +134,10 @@ def safe_tool_conversion(
name, description, parameters = extract_tool_info(tool)
validated_name = validate_function_name(name, provider)
# Sanitize name before validation (replace invalid chars with underscores)
sanitized_name = sanitize_function_name(name)
validated_name = validate_function_name(sanitized_name, provider)
logging.info(f"{provider}: Successfully validated tool '{validated_name}'")
return validated_name, description, parameters

View File

@@ -31,6 +31,7 @@ from crewai.mcp.transports.base import BaseTransport
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.stdio import StdioTransport
from crewai.utilities.string_utils import sanitize_tool_name
# MCP Connection timeout constants (in seconds)
@@ -418,7 +419,7 @@ class MCPClient:
return [
{
"name": tool.name,
"name": sanitize_tool_name(tool.name),
"description": getattr(tool, "description", ""),
"inputSchema": getattr(tool, "inputSchema", {}),
}

View File

@@ -52,6 +52,7 @@ from crewai.telemetry.utils import (
close_span,
)
from crewai.utilities.logger_utils import suppress_warnings
from crewai.utilities.string_utils import sanitize_tool_name
logger = logging.getLogger(__name__)
@@ -323,7 +324,8 @@ class Telemetry:
),
"max_retry_limit": getattr(agent, "max_retry_limit", 3),
"tools_names": [
tool.name.casefold() for tool in agent.tools or []
sanitize_tool_name(tool.name)
for tool in agent.tools or []
],
# Add agent fingerprint data if sharing crew details
"fingerprint": (
@@ -372,7 +374,8 @@ class Telemetry:
else None
),
"tools_names": [
tool.name.casefold() for tool in task.tools or []
sanitize_tool_name(tool.name)
for tool in task.tools or []
],
# Add task fingerprint data if sharing crew details
"fingerprint": (
@@ -425,7 +428,8 @@ class Telemetry:
),
"max_retry_limit": getattr(agent, "max_retry_limit", 3),
"tools_names": [
tool.name.casefold() for tool in agent.tools or []
sanitize_tool_name(tool.name)
for tool in agent.tools or []
],
}
for agent in crew.agents
@@ -447,7 +451,8 @@ class Telemetry:
),
"agent_key": task.agent.key if task.agent else None,
"tools_names": [
tool.name.casefold() for tool in task.tools or []
sanitize_tool_name(tool.name)
for tool in task.tools or []
],
}
for task in crew.tasks
@@ -832,7 +837,8 @@ class Telemetry:
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"tools_names": [
tool.name.casefold() for tool in agent.tools or []
sanitize_tool_name(tool.name)
for tool in agent.tools or []
],
}
for agent in crew.agents
@@ -858,7 +864,8 @@ class Telemetry:
else None
),
"tools_names": [
tool.name.casefold() for tool in task.tools or []
sanitize_tool_name(tool.name)
for tool in task.tools or []
],
}
for task in crew.tasks

View File

@@ -26,6 +26,7 @@ from typing_extensions import TypeIs
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.string_utils import sanitize_tool_name
_printer = Printer()
@@ -154,7 +155,6 @@ class BaseTool(BaseModel, ABC):
*args: Any,
**kwargs: Any,
) -> Any:
_printer.print(f"Using Tool: {self.name}", color="cyan")
result = self._run(*args, **kwargs)
# If _run is async, we safely run it
@@ -260,10 +260,12 @@ class BaseTool(BaseModel, ABC):
else:
fields[name] = (param_annotation, param.default)
if fields:
args_schema = create_model(f"{tool.name}Input", **fields)
args_schema = create_model(
f"{sanitize_tool_name(tool.name)}_input", **fields
)
else:
args_schema = create_model(
f"{tool.name}Input", __base__=PydanticBaseModel
f"{sanitize_tool_name(tool.name)}_input", __base__=PydanticBaseModel
)
return cls(
@@ -302,7 +304,7 @@ class BaseTool(BaseModel, ABC):
schema = generate_model_description(self.args_schema)
args_json = json.dumps(schema["json_schema"]["schema"], indent=2)
self.description = (
f"Tool Name: {self.name}\n"
f"Tool Name: {sanitize_tool_name(self.name)}\n"
f"Tool Arguments: {args_json}\n"
f"Tool Description: {self.description}"
)
@@ -329,7 +331,6 @@ class Tool(BaseTool, Generic[P, R]):
Returns:
The result of the tool execution.
"""
_printer.print(f"Using Tool: {self.name}", color="cyan")
result = self.func(*args, **kwargs)
if asyncio.iscoroutine(result):
@@ -381,7 +382,7 @@ class Tool(BaseTool, Generic[P, R]):
if _is_awaitable(result):
return await result
raise NotImplementedError(
f"{self.name} does not have an async function. "
f"{sanitize_tool_name(self.name)} does not have an async function. "
"Use run() for sync execution or provide an async function."
)
@@ -423,10 +424,12 @@ class Tool(BaseTool, Generic[P, R]):
else:
fields[name] = (param_annotation, param.default)
if fields:
args_schema = create_model(f"{tool.name}Input", **fields)
args_schema = create_model(
f"{sanitize_tool_name(tool.name)}_input", **fields
)
else:
args_schema = create_model(
f"{tool.name}Input", __base__=PydanticBaseModel
f"{sanitize_tool_name(tool.name)}_input", __base__=PydanticBaseModel
)
return cls(

View File

@@ -2,6 +2,7 @@ from pydantic import BaseModel, Field
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.string_utils import sanitize_tool_name
class CacheTools(BaseModel):
@@ -13,14 +14,14 @@ class CacheTools(BaseModel):
default_factory=CacheHandler,
)
def tool(self):
def tool(self) -> CrewStructuredTool:
return CrewStructuredTool.from_function(
func=self.hit_cache,
name=self.name,
name=sanitize_tool_name(self.name),
description="Reads directly from the cache",
)
def hit_cache(self, key):
def hit_cache(self, key: str) -> str | None:
split = key.split("tool:")
tool = split[1].split("|input:")[0].strip()
tool_input = split[1].split("|input:")[1].strip()

View File

@@ -10,6 +10,7 @@ from typing import TYPE_CHECKING, Any, get_type_hints
from pydantic import BaseModel, Field, create_model
from crewai.utilities.logger import Logger
from crewai.utilities.string_utils import sanitize_tool_name
if TYPE_CHECKING:
@@ -229,7 +230,7 @@ class CrewStructuredTool:
if self.has_reached_max_usage_count():
raise ToolUsageLimitExceededError(
f"Tool '{self.name}' has reached its maximum usage limit of {self.max_usage_count}. You should not use the {self.name} tool again."
f"Tool '{sanitize_tool_name(self.name)}' has reached its maximum usage limit of {self.max_usage_count}. You should not use the {sanitize_tool_name(self.name)} tool again."
)
self._increment_usage_count()
@@ -261,7 +262,7 @@ class CrewStructuredTool:
if self.has_reached_max_usage_count():
raise ToolUsageLimitExceededError(
f"Tool '{self.name}' has reached its maximum usage limit of {self.max_usage_count}. You should not use the {self.name} tool again."
f"Tool '{sanitize_tool_name(self.name)}' has reached its maximum usage limit of {self.max_usage_count}. You should not use the {sanitize_tool_name(self.name)} tool again."
)
self._increment_usage_count()
@@ -295,6 +296,4 @@ class CrewStructuredTool:
return self.args_schema.model_json_schema()["properties"]
def __repr__(self) -> str:
return (
f"CrewStructuredTool(name='{self.name}', description='{self.description}')"
)
return f"CrewStructuredTool(name='{sanitize_tool_name(self.name)}', description='{self.description}')"

View File

@@ -30,6 +30,7 @@ from crewai.utilities.agent_utils import (
from crewai.utilities.converter import Converter
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import sanitize_tool_name
if TYPE_CHECKING:
@@ -145,7 +146,8 @@ class ToolUsage:
if (
isinstance(tool, CrewStructuredTool)
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
and sanitize_tool_name(tool.name)
== sanitize_tool_name(self._i18n.tools("add_image")["name"]) # type: ignore
):
try:
return self._use(tool_string=tool_string, tool=tool, calling=calling)
@@ -192,7 +194,8 @@ class ToolUsage:
if (
isinstance(tool, CrewStructuredTool)
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
and sanitize_tool_name(tool.name)
== sanitize_tool_name(self._i18n.tools("add_image")["name"]) # type: ignore
):
try:
return await self._ause(
@@ -233,7 +236,7 @@ class ToolUsage:
)
self._telemetry.tool_repeated_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
tool_name=sanitize_tool_name(tool.name),
attempts=self._run_attempts,
)
return self._format_result(result=result)
@@ -278,7 +281,7 @@ class ToolUsage:
input_str = str(calling.arguments)
result = self.tools_handler.cache.read(
tool=calling.tool_name, input=input_str
tool=sanitize_tool_name(calling.tool_name), input=input_str
) # type: ignore
from_cache = result is not None
@@ -286,12 +289,15 @@ class ToolUsage:
(
available_tool
for available_tool in self.tools
if available_tool.name == tool.name
if sanitize_tool_name(available_tool.name)
== sanitize_tool_name(tool.name)
),
None,
)
usage_limit_error = self._check_usage_limit(available_tool, tool.name)
usage_limit_error = self._check_usage_limit(
available_tool, sanitize_tool_name(tool.name)
)
if usage_limit_error:
result = usage_limit_error
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
@@ -299,9 +305,9 @@ class ToolUsage:
# Don't return early - fall through to finally block
elif result is None:
try:
if calling.tool_name in [
"Delegate work to coworker",
"Ask question to coworker",
if sanitize_tool_name(calling.tool_name) in [
sanitize_tool_name("Delegate work to coworker"),
sanitize_tool_name("Ask question to coworker"),
]:
coworker = (
calling.arguments.get("coworker")
@@ -333,13 +339,16 @@ class ToolUsage:
if self.tools_handler:
should_cache = True
if (
hasattr(available_tool, "cache_function")
and available_tool.cache_function
):
should_cache = available_tool.cache_function(
calling.arguments, result
)
# Check cache_function on original tool (for tools converted via to_structured_tool)
original_tool = getattr(available_tool, "_original_tool", None)
cache_func = None
if original_tool and hasattr(original_tool, "cache_function"):
cache_func = original_tool.cache_function
elif hasattr(available_tool, "cache_function"):
cache_func = available_tool.cache_function
if cache_func:
should_cache = cache_func(calling.arguments, result)
self.tools_handler.on_tool_use(
calling=calling, output=result, should_cache=should_cache
@@ -347,13 +356,13 @@ class ToolUsage:
self._telemetry.tool_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
tool_name=sanitize_tool_name(tool.name),
attempts=self._run_attempts,
)
result = self._format_result(result=result)
data = {
"result": result,
"tool_name": tool.name,
"tool_name": sanitize_tool_name(tool.name),
"tool_args": calling.arguments,
}
@@ -368,6 +377,19 @@ class ToolUsage:
self.agent.tools_results.append(data)
if available_tool and hasattr(
available_tool, "_increment_usage_count"
):
# Use _increment_usage_count to sync count to original tool
available_tool._increment_usage_count()
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{sanitize_tool_name(available_tool.name)}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
elif available_tool and hasattr(
available_tool, "current_usage_count"
):
available_tool.current_usage_count += 1
@@ -376,7 +398,7 @@ class ToolUsage:
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
content=f"Tool '{sanitize_tool_name(available_tool.name)}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
@@ -387,7 +409,11 @@ class ToolUsage:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
error_message = self._i18n.errors(
"tool_usage_exception"
).format(error=e, tool=tool.name, tool_inputs=tool.description)
).format(
error=e,
tool=sanitize_tool_name(tool.name),
tool_inputs=tool.description,
)
result = ToolUsageError(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
@@ -434,7 +460,7 @@ class ToolUsage:
)
self._telemetry.tool_repeated_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
tool_name=sanitize_tool_name(tool.name),
attempts=self._run_attempts,
)
return self._format_result(result=result)
@@ -481,7 +507,7 @@ class ToolUsage:
input_str = str(calling.arguments)
result = self.tools_handler.cache.read(
tool=calling.tool_name, input=input_str
tool=sanitize_tool_name(calling.tool_name), input=input_str
) # type: ignore
from_cache = result is not None
@@ -489,12 +515,15 @@ class ToolUsage:
(
available_tool
for available_tool in self.tools
if available_tool.name == tool.name
if sanitize_tool_name(available_tool.name)
== sanitize_tool_name(tool.name)
),
None,
)
usage_limit_error = self._check_usage_limit(available_tool, tool.name)
usage_limit_error = self._check_usage_limit(
available_tool, sanitize_tool_name(tool.name)
)
if usage_limit_error:
result = usage_limit_error
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
@@ -502,9 +531,9 @@ class ToolUsage:
# Don't return early - fall through to finally block
elif result is None:
try:
if calling.tool_name in [
"Delegate work to coworker",
"Ask question to coworker",
if sanitize_tool_name(calling.tool_name) in [
sanitize_tool_name("Delegate work to coworker"),
sanitize_tool_name("Ask question to coworker"),
]:
coworker = (
calling.arguments.get("coworker")
@@ -536,13 +565,16 @@ class ToolUsage:
if self.tools_handler:
should_cache = True
if (
hasattr(available_tool, "cache_function")
and available_tool.cache_function
):
should_cache = available_tool.cache_function(
calling.arguments, result
)
# Check cache_function on original tool (for tools converted via to_structured_tool)
original_tool = getattr(available_tool, "_original_tool", None)
cache_func = None
if original_tool and hasattr(original_tool, "cache_function"):
cache_func = original_tool.cache_function
elif hasattr(available_tool, "cache_function"):
cache_func = available_tool.cache_function
if cache_func:
should_cache = cache_func(calling.arguments, result)
self.tools_handler.on_tool_use(
calling=calling, output=result, should_cache=should_cache
@@ -550,13 +582,13 @@ class ToolUsage:
self._telemetry.tool_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
tool_name=sanitize_tool_name(tool.name),
attempts=self._run_attempts,
)
result = self._format_result(result=result)
data = {
"result": result,
"tool_name": tool.name,
"tool_name": sanitize_tool_name(tool.name),
"tool_args": calling.arguments,
}
@@ -571,6 +603,19 @@ class ToolUsage:
self.agent.tools_results.append(data)
if available_tool and hasattr(
available_tool, "_increment_usage_count"
):
# Use _increment_usage_count to sync count to original tool
available_tool._increment_usage_count()
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{sanitize_tool_name(available_tool.name)}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
elif available_tool and hasattr(
available_tool, "current_usage_count"
):
available_tool.current_usage_count += 1
@@ -579,7 +624,7 @@ class ToolUsage:
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
content=f"Tool '{sanitize_tool_name(available_tool.name)}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
@@ -590,7 +635,11 @@ class ToolUsage:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
error_message = self._i18n.errors(
"tool_usage_exception"
).format(error=e, tool=tool.name, tool_inputs=tool.description)
).format(
error=e,
tool=sanitize_tool_name(tool.name),
tool_inputs=tool.description,
)
result = ToolUsageError(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
@@ -648,9 +697,10 @@ class ToolUsage:
if not self.tools_handler:
return False
if last_tool_usage := self.tools_handler.last_used_tool:
return (calling.tool_name == last_tool_usage.tool_name) and (
calling.arguments == last_tool_usage.arguments
)
return (
sanitize_tool_name(calling.tool_name)
== sanitize_tool_name(last_tool_usage.tool_name)
) and (calling.arguments == last_tool_usage.arguments)
return False
@staticmethod
@@ -673,20 +723,19 @@ class ToolUsage:
return None
def _select_tool(self, tool_name: str) -> Any:
sanitized_input = sanitize_tool_name(tool_name)
order_tools = sorted(
self.tools,
key=lambda tool: SequenceMatcher(
None, tool.name.lower().strip(), tool_name.lower().strip()
None, sanitize_tool_name(tool.name), sanitized_input
).ratio(),
reverse=True,
)
for tool in order_tools:
sanitized_tool = sanitize_tool_name(tool.name)
if (
tool.name.lower().strip() == tool_name.lower().strip()
or SequenceMatcher(
None, tool.name.lower().strip(), tool_name.lower().strip()
).ratio()
> 0.85
sanitized_tool == sanitized_input
or SequenceMatcher(None, sanitized_tool, sanitized_input).ratio() > 0.85
):
return tool
if self.task:
@@ -771,7 +820,7 @@ class ToolUsage:
return ToolUsageError(f"{self._i18n.errors('tool_arguments_error')}")
return ToolCalling(
tool_name=tool.name,
tool_name=sanitize_tool_name(tool.name),
arguments=arguments,
)
@@ -925,7 +974,7 @@ class ToolUsage:
event_data = {
"run_attempts": self._run_attempts,
"delegations": self.task.delegations if self.task else 0,
"tool_name": tool.name,
"tool_name": sanitize_tool_name(tool.name),
"tool_args": tool_calling.arguments,
"tool_class": tool.__class__.__name__,
"agent_key": (

View File

@@ -11,7 +11,10 @@
"role_playing": "You are {role}. {backstory}\nYour personal goal is: {goal}",
"tools": "\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\n{tools}\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [{tool_names}], just the name, exactly as it's written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final answer to the original input question\n```",
"no_tools": "\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!",
"format": "I MUST either use a tool (use one at time) OR give my best final answer not both at the same time. When responding, I must use the following format:\n\n```\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action, dictionary enclosed in curly braces\nObservation: the result of the action\n```\nThis Thought/Action/Action Input/Result can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",
"native_tools": "\nUse available tools to gather information and complete your task.",
"native_task": "\nCurrent Task: {input}\n\nThis is VERY important to you, your job depends on it!",
"post_tool_reasoning": "Analyze the tool result. If requirements are met, provide the Final Answer. Otherwise, call the next tool. Deliver only the answer without meta-commentary.",
"format": "Decide if you need a tool or can provide the final answer. Use one at a time.\nTo use a tool, use:\nThought: [reasoning]\nAction: [name from {tool_names}]\nAction Input: [JSON object]\n\nTo provide the final answer, use:\nThought: [reasoning]\nFinal Answer: [complete response]",
"final_answer_format": "If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfies the expected criteria, use the EXACT format below:\n\n```\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task.\n\n```",
"format_without_tools": "\nSorry, I didn't use the right format. I MUST either use a tool (among the available ones), OR give my best final answer.\nHere is the expected format I must follow:\n\n```\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n```\n This Thought/Action/Action Input/Result process can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",
"task_with_context": "{task}\n\nThis is the context you're working with:\n{context}",

View File

@@ -28,6 +28,7 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
)
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import ColoredText, Printer
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.types import LLMMessage
@@ -96,15 +97,15 @@ def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]:
def get_tool_names(tools: Sequence[CrewStructuredTool | BaseTool]) -> str:
"""Get the names of the tools.
"""Get the sanitized names of the tools.
Args:
tools: List of tools to get names from.
Returns:
Comma-separated string of tool names.
Comma-separated string of sanitized tool names.
"""
return ", ".join([t.name for t in tools])
return ", ".join([sanitize_tool_name(t.name) for t in tools])
def render_text_description_and_args(
@@ -126,6 +127,66 @@ def render_text_description_and_args(
return "\n".join(tool_strings)
def convert_tools_to_openai_schema(
tools: Sequence[BaseTool | CrewStructuredTool],
) -> tuple[list[dict[str, Any]], dict[str, Callable[..., Any]]]:
"""Convert CrewAI tools to OpenAI function calling format.
This function converts CrewAI BaseTool and CrewStructuredTool objects
into the OpenAI-compatible tool schema format that can be passed to
LLM providers for native function calling.
Args:
tools: List of CrewAI tool objects to convert.
Returns:
Tuple containing:
- List of OpenAI-format tool schema dictionaries
- Dict mapping tool names to their callable run() methods
Example:
>>> tools = [CalculatorTool(), SearchTool()]
>>> schemas, functions = convert_tools_to_openai_schema(tools)
>>> # schemas can be passed to llm.call(tools=schemas)
>>> # functions can be passed to llm.call(available_functions=functions)
"""
openai_tools: list[dict[str, Any]] = []
available_functions: dict[str, Callable[..., Any]] = {}
for tool in tools:
# Get the JSON schema for tool parameters
parameters: dict[str, Any] = {}
if hasattr(tool, "args_schema") and tool.args_schema is not None:
try:
parameters = tool.args_schema.model_json_schema()
# Remove title and description from schema root as they're redundant
parameters.pop("title", None)
parameters.pop("description", None)
except Exception:
parameters = {}
# Extract original description from formatted description
# BaseTool formats description as "Tool Name: ...\nTool Arguments: ...\nTool Description: {original}"
description = tool.description
if "Tool Description:" in description:
description = description.split("Tool Description:")[-1].strip()
sanitized_name = sanitize_tool_name(tool.name)
schema: dict[str, Any] = {
"type": "function",
"function": {
"name": sanitized_name,
"description": description,
"parameters": parameters,
},
}
openai_tools.append(schema)
available_functions[sanitized_name] = tool.run # type: ignore[union-attr]
return openai_tools, available_functions
def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
"""Check if the maximum number of iterations has been reached.
@@ -252,11 +313,13 @@ def get_llm_response(
messages: list[LLMMessage],
callbacks: list[TokenCalcHandler],
printer: Printer,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Callable[..., Any]] | None = None,
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | LiteAgent | None = None,
) -> str:
) -> str | Any:
"""Call the LLM and return the response, handling any invalid responses.
Args:
@@ -264,13 +327,16 @@ def get_llm_response(
messages: The messages to send to the LLM.
callbacks: List of callbacks for the LLM call.
printer: Printer instance for output.
tools: Optional list of tool schemas for native function calling.
available_functions: Optional dict mapping function names to callables.
from_task: Optional task context for the LLM call.
from_agent: Optional agent context for the LLM call.
response_model: Optional Pydantic model for structured outputs.
executor_context: Optional executor context for hook invocation.
Returns:
The response from the LLM as a string.
The response from the LLM as a string, or tool call results if
native function calling is used.
Raises:
Exception: If an error occurs.
@@ -285,7 +351,9 @@ def get_llm_response(
try:
answer = llm.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent, # type: ignore[arg-type]
response_model=response_model,
@@ -307,11 +375,13 @@ async def aget_llm_response(
messages: list[LLMMessage],
callbacks: list[TokenCalcHandler],
printer: Printer,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Callable[..., Any]] | None = None,
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | None = None,
) -> str:
) -> str | Any:
"""Call the LLM asynchronously and return the response.
Args:
@@ -319,13 +389,16 @@ async def aget_llm_response(
messages: The messages to send to the LLM.
callbacks: List of callbacks for the LLM call.
printer: Printer instance for output.
tools: Optional list of tool schemas for native function calling.
available_functions: Optional dict mapping function names to callables.
from_task: Optional task context for the LLM call.
from_agent: Optional agent context for the LLM call.
response_model: Optional Pydantic model for structured outputs.
executor_context: Optional executor context for hook invocation.
Returns:
The response from the LLM as a string.
The response from the LLM as a string, or tool call results if
native function calling is used.
Raises:
Exception: If an error occurs.
@@ -339,7 +412,9 @@ async def aget_llm_response(
try:
answer = await llm.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent, # type: ignore[arg-type]
response_model=response_model,
@@ -744,6 +819,71 @@ def load_agent_from_repository(from_repository: str) -> dict[str, Any]:
return attributes
DELEGATION_TOOL_NAMES: Final[frozenset[str]] = frozenset(
[
sanitize_tool_name("Delegate work to coworker"),
sanitize_tool_name("Ask question to coworker"),
]
)
# native tool calling tracking for delegation
def track_delegation_if_needed(
tool_name: str,
tool_args: dict[str, Any],
task: Task | None,
) -> None:
"""Track delegation if the tool is a delegation tool.
Args:
tool_name: Name of the tool being executed.
tool_args: Arguments passed to the tool.
task: The task being executed (used to track delegations).
"""
if sanitize_tool_name(tool_name) in DELEGATION_TOOL_NAMES and task is not None:
coworker = tool_args.get("coworker")
task.increment_delegations(coworker)
def extract_tool_call_info(
tool_call: Any,
) -> tuple[str, str, dict[str, Any] | str] | None:
"""Extract tool call ID, name, and arguments from various provider formats.
Args:
tool_call: The tool call object to extract info from.
Returns:
Tuple of (call_id, func_name, func_args) or None if format is unrecognized.
"""
if hasattr(tool_call, "function"):
# OpenAI-style: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
return call_id, sanitize_tool_name(tool_call.function.name), tool_call.function.arguments
if hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini-style: has .function_call.name and .function_call.args
call_id = f"call_{id(tool_call)}"
return (
call_id,
sanitize_tool_name(tool_call.function_call.name),
dict(tool_call.function_call.args) if tool_call.function_call.args else {},
)
if hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
return call_id, sanitize_tool_name(tool_call.name), tool_call.input
if isinstance(tool_call, dict):
# Support OpenAI "id", Bedrock "toolUseId", or generate one
call_id = (
tool_call.get("id") or tool_call.get("toolUseId") or f"call_{id(tool_call)}"
)
func_info = tool_call.get("function", {})
func_name = func_info.get("name", "") or tool_call.get("name", "")
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
return call_id, sanitize_tool_name(func_name), func_args
return None
def _setup_before_llm_call_hooks(
executor_context: CrewAgentExecutor | LiteAgent | None, printer: Printer
) -> bool:

View File

@@ -22,7 +22,9 @@ class SystemPromptResult(StandardPromptResult):
user: Annotated[str, "The user prompt component"]
COMPONENTS = Literal["role_playing", "tools", "no_tools", "task"]
COMPONENTS = Literal[
"role_playing", "tools", "no_tools", "native_tools", "task", "native_task"
]
class Prompts(BaseModel):
@@ -36,6 +38,10 @@ class Prompts(BaseModel):
has_tools: bool = Field(
default=False, description="Indicates if the agent has access to tools"
)
use_native_tool_calling: bool = Field(
default=False,
description="Whether to use native function calling instead of ReAct format",
)
system_template: str | None = Field(
default=None, description="Custom system prompt template"
)
@@ -58,12 +64,22 @@ class Prompts(BaseModel):
A dictionary containing the constructed prompt(s).
"""
slices: list[COMPONENTS] = ["role_playing"]
# When using native tool calling with tools, use native_tools instructions
# When using ReAct pattern with tools, use tools instructions
# When no tools are available, use no_tools instructions
if self.has_tools:
slices.append("tools")
if not self.use_native_tool_calling:
slices.append("tools")
else:
slices.append("no_tools")
system: str = self._build_prompt(slices)
slices.append("task")
# Use native_task for native tool calling (no "Thought:" prompt)
# Use task for ReAct pattern (includes "Thought:" prompt)
task_slice: COMPONENTS = (
"native_task" if self.use_native_tool_calling else "task"
)
slices.append(task_slice)
if (
not self.system_template
@@ -72,7 +88,7 @@ class Prompts(BaseModel):
):
return SystemPromptResult(
system=system,
user=self._build_prompt(["task"]),
user=self._build_prompt([task_slice]),
prompt=self._build_prompt(slices),
)
return StandardPromptResult(

View File

@@ -13,6 +13,7 @@ from crewai.events.types.reasoning_events import (
)
from crewai.llm import LLM
from crewai.task import Task
from crewai.utilities.string_utils import sanitize_tool_name
class ReasoningPlan(BaseModel):
@@ -340,7 +341,9 @@ class AgentReasoning:
str: Comma-separated list of tool names.
"""
try:
return ", ".join([tool.name for tool in (self.task.tools or [])])
return ", ".join(
[sanitize_tool_name(tool.name) for tool in (self.task.tools or [])]
)
except (AttributeError, TypeError):
return "No tools available"

View File

@@ -66,11 +66,23 @@ def to_serializable(
if key not in exclude
}
if isinstance(obj, BaseModel):
return to_serializable(
obj=obj.model_dump(exclude=exclude),
max_depth=max_depth,
_current_depth=_current_depth + 1,
)
try:
return to_serializable(
obj=obj.model_dump(exclude=exclude),
max_depth=max_depth,
_current_depth=_current_depth + 1,
)
except Exception:
try:
return {
_to_serializable_key(k): to_serializable(
v, max_depth=max_depth, _current_depth=_current_depth + 1
)
for k, v in obj.__dict__.items()
if k not in (exclude or set())
}
except Exception:
return repr(obj)
return repr(obj)

View File

@@ -18,6 +18,7 @@ from crewai.types.streaming import (
StreamChunkType,
ToolCallChunk,
)
from crewai.utilities.string_utils import sanitize_tool_name
class TaskInfo(TypedDict):
@@ -58,7 +59,7 @@ def _extract_tool_call_info(
StreamChunkType.TOOL_CALL,
ToolCallChunk(
tool_id=event.tool_call.id,
tool_name=event.tool_call.function.name,
tool_name=sanitize_tool_name(event.tool_call.function.name),
arguments=event.tool_call.function.arguments,
index=event.tool_call.index,
),

View File

@@ -1,8 +1,48 @@
# sanitize_tool_name adapted from python-slugify by Val Neekman
# https://github.com/un33k/python-slugify
# MIT License
import re
from typing import Any, Final
import unicodedata
_VARIABLE_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{([A-Za-z_][A-Za-z0-9_\-]*)}")
_QUOTE_PATTERN: Final[re.Pattern[str]] = re.compile(r"[\'\"]+")
_CAMEL_LOWER_UPPER: Final[re.Pattern[str]] = re.compile(r"([a-z])([A-Z])")
_CAMEL_UPPER_LOWER: Final[re.Pattern[str]] = re.compile(r"([A-Z]+)([A-Z][a-z])")
_DISALLOWED_CHARS_PATTERN: Final[re.Pattern[str]] = re.compile(r"[^a-zA-Z0-9]+")
_DUPLICATE_UNDERSCORE_PATTERN: Final[re.Pattern[str]] = re.compile(r"_+")
_MAX_TOOL_NAME_LENGTH: Final[int] = 64
def sanitize_tool_name(name: str, max_length: int = _MAX_TOOL_NAME_LENGTH) -> str:
"""Sanitize tool name for LLM provider compatibility.
Normalizes Unicode, splits camelCase, lowercases, replaces invalid characters
with underscores, and truncates to max_length. Conforms to OpenAI/Bedrock requirements.
Args:
name: Original tool name.
max_length: Maximum allowed length (default 64 per OpenAI/Bedrock limits).
Returns:
Sanitized tool name (lowercase, a-z0-9_ only, max 64 chars).
"""
name = unicodedata.normalize("NFKD", name)
name = name.encode("ascii", "ignore").decode("ascii")
name = _CAMEL_UPPER_LOWER.sub(r"\1_\2", name)
name = _CAMEL_LOWER_UPPER.sub(r"\1_\2", name)
name = name.lower()
name = _QUOTE_PATTERN.sub("", name)
name = _DISALLOWED_CHARS_PATTERN.sub("_", name)
name = _DUPLICATE_UNDERSCORE_PATTERN.sub("_", name)
name = name.strip("_")
if len(name) > max_length:
name = name[:max_length].rstrip("_")
return name
def interpolate_only(

View File

@@ -15,6 +15,7 @@ from crewai.tools.tool_types import ToolResult
from crewai.tools.tool_usage import ToolUsage, ToolUsageError
from crewai.utilities.i18n import I18N
from crewai.utilities.logger import Logger
from crewai.utilities.string_utils import sanitize_tool_name
if TYPE_CHECKING:
@@ -63,7 +64,7 @@ async def aexecute_tool_and_check_finality(
treated as a final answer.
"""
logger = Logger(verbose=crew.verbose if crew else False)
tool_name_to_tool_map = {tool.name: tool for tool in tools}
tool_name_to_tool_map = {sanitize_tool_name(tool.name): tool for tool in tools}
if agent_key and agent_role and agent:
fingerprint_context = fingerprint_context or {}
@@ -90,19 +91,9 @@ async def aexecute_tool_and_check_finality(
if isinstance(tool_calling, ToolUsageError):
return ToolResult(tool_calling.message, False)
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in tool_name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in tool_name_to_tool_map
]:
tool = tool_name_to_tool_map.get(tool_calling.tool_name)
if not tool:
tool_result = i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([t.name.casefold() for t in tools]),
)
return ToolResult(result=tool_result, result_as_answer=False)
sanitized_tool_name = sanitize_tool_name(tool_calling.tool_name)
tool = tool_name_to_tool_map.get(sanitized_tool_name)
if tool:
tool_input = tool_calling.arguments if tool_calling.arguments else {}
hook_context = ToolCallHookContext(
tool_name=tool_calling.tool_name,
@@ -152,8 +143,8 @@ async def aexecute_tool_and_check_finality(
return ToolResult(modified_result, tool.result_as_answer)
tool_result = i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in tools]),
tool=sanitized_tool_name,
tools=", ".join(tool_name_to_tool_map.keys()),
)
return ToolResult(result=tool_result, result_as_answer=False)
@@ -193,7 +184,7 @@ def execute_tool_and_check_finality(
ToolResult containing the execution result and whether it should be treated as a final answer
"""
logger = Logger(verbose=crew.verbose if crew else False)
tool_name_to_tool_map = {tool.name: tool for tool in tools}
tool_name_to_tool_map = {sanitize_tool_name(tool.name): tool for tool in tools}
if agent_key and agent_role and agent:
fingerprint_context = fingerprint_context or {}
@@ -206,7 +197,6 @@ def execute_tool_and_check_finality(
except Exception as e:
raise ValueError(f"Failed to set fingerprint: {e}") from e
# Create tool usage instance
tool_usage = ToolUsage(
tools_handler=tools_handler,
tools=tools,
@@ -216,26 +206,14 @@ def execute_tool_and_check_finality(
action=agent_action,
)
# Parse tool calling
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
if isinstance(tool_calling, ToolUsageError):
return ToolResult(tool_calling.message, False)
# Check if tool name matches
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in tool_name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in tool_name_to_tool_map
]:
tool = tool_name_to_tool_map.get(tool_calling.tool_name)
if not tool:
tool_result = i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([t.name.casefold() for t in tools]),
)
return ToolResult(result=tool_result, result_as_answer=False)
sanitized_tool_name = sanitize_tool_name(tool_calling.tool_name)
tool = tool_name_to_tool_map.get(sanitized_tool_name)
if tool:
tool_input = tool_calling.arguments if tool_calling.arguments else {}
hook_context = ToolCallHookContext(
tool_name=tool_calling.tool_name,
@@ -285,9 +263,8 @@ def execute_tool_and_check_finality(
return ToolResult(modified_result, tool.result_as_answer)
# Handle invalid tool name
tool_result = i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in tools]),
tool=sanitized_tool_name,
tools=", ".join(tool_name_to_tool_map.keys()),
)
return ToolResult(result=tool_result, result_as_answer=False)

View File

@@ -2,7 +2,7 @@
from typing import Any, Literal
from typing_extensions import TypedDict
from typing_extensions import NotRequired, TypedDict
class LLMMessage(TypedDict):
@@ -13,5 +13,8 @@ class LLMMessage(TypedDict):
instead of str | list[dict[str, str]]
"""
role: Literal["user", "assistant", "system"]
content: str | list[dict[str, Any]]
role: Literal["user", "assistant", "system", "tool"]
content: str | list[dict[str, Any]] | None
tool_call_id: NotRequired[str]
name: NotRequired[str]
tool_calls: NotRequired[list[dict[str, Any]]]