wip: clean

This commit is contained in:
lorenzejay
2026-01-14 12:08:41 -08:00
parent 9edbf89b68
commit 6c5e5056f3
17 changed files with 1874 additions and 55 deletions

View File

@@ -709,9 +709,17 @@ class Agent(BaseAgent):
raw_tools: list[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
use_native_tool_calling = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and len(raw_tools) > 0
)
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
use_native_tool_calling=use_native_tool_calling,
i18n=self.i18n,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
@@ -719,6 +727,8 @@ class Agent(BaseAgent):
response_template=self.response_template,
).task_execution()
print("prompt", prompt)
stop_words = [self.i18n.slice("observation")]
if self.response_template:

View File

@@ -236,14 +236,30 @@ def process_tool_results(agent: Agent, result: Any) -> Any:
def save_last_messages(agent: Agent) -> None:
"""Save the last messages from agent executor.
Sanitizes messages to be compatible with TaskOutput's LLMMessage type,
which only accepts 'user', 'assistant', 'system' roles and requires
content to be a string or list (not None).
Args:
agent: The agent instance.
"""
agent._last_messages = (
agent.agent_executor.messages.copy()
if agent.agent_executor and hasattr(agent.agent_executor, "messages")
else []
)
if not agent.agent_executor or not hasattr(agent.agent_executor, "messages"):
agent._last_messages = []
return
sanitized_messages = []
for msg in agent.agent_executor.messages:
role = msg.get("role", "")
# Only include messages with valid LLMMessage roles
if role not in ("user", "assistant", "system"):
continue
# Ensure content is not None (can happen with tool call assistant messages)
content = msg.get("content")
if content is None:
content = ""
sanitized_messages.append({"role": role, "content": content})
agent._last_messages = sanitized_messages
def prepare_tools(

View File

@@ -30,6 +30,7 @@ from crewai.hooks.llm_hooks import (
)
from crewai.utilities.agent_utils import (
aget_llm_response,
convert_tools_to_openai_schema,
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
@@ -215,6 +216,33 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _invoke_loop(self) -> AgentFinish:
"""Execute agent loop until completion.
Checks if the LLM supports native function calling and uses that
approach if available, otherwise falls back to the ReAct text pattern.
Returns:
Final answer from the agent.
"""
# Check if model supports native function calling
use_native_tools = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.original_tools
)
if use_native_tools:
return self._invoke_loop_native_tools()
# Fall back to ReAct text-based pattern
return self._invoke_loop_react()
def _invoke_loop_react(self) -> AgentFinish:
"""Execute agent loop using ReAct text-based pattern.
This is the traditional approach where tool definitions are embedded
in the prompt and the LLM outputs Action/Action Input text that is
parsed to execute tools.
Returns:
Final answer from the agent.
"""
@@ -244,6 +272,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
response_model=self.response_model,
executor_context=self,
)
print("--------------------------------")
print("get_llm_response answer", answer)
print("--------------------------------")
# breakpoint()
if self.response_model is not None:
try:
self.response_model.model_validate_json(answer)
@@ -333,6 +365,338 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
def _invoke_loop_native_tools(self) -> AgentFinish:
"""Execute agent loop using native function calling.
This method uses the LLM's native tool/function calling capability
instead of the text-based ReAct pattern. The LLM directly returns
structured tool calls which are executed and results fed back.
Returns:
Final answer from the agent.
"""
print("--------------------------------")
print("invoke_loop_native_tools")
print("--------------------------------")
# Convert tools to OpenAI schema format
if not self.original_tools:
# No tools available, fall back to simple LLM call
return self._invoke_loop_native_no_tools()
openai_tools, available_functions = convert_tools_to_openai_schema(
self.original_tools
)
while True:
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
self._show_logs(formatted_answer)
return formatted_answer
enforce_rpm_limit(self.request_within_rpm_limit)
# Debug: Show messages being sent to LLM
print("--------------------------------")
print(f"Messages count: {len(self.messages)}")
for i, msg in enumerate(self.messages):
role = msg.get("role", "unknown")
content = msg.get("content", "")
if content:
preview = (
content[:200] + "..." if len(content) > 200 else content
)
else:
preview = "(no content)"
print(f" [{i}] {role}: {preview}")
print("--------------------------------")
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution
# via _handle_native_tool_calls to properly manage message history.
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
tools=openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
print("--------------------------------")
print("invoke_loop_native_tools answer", answer)
print("--------------------------------")
# print("get_llm_response answer", answer[:500] + "...")
# Check if the response is a list of tool calls
if (
isinstance(answer, list)
and answer
and self._is_tool_call_list(answer)
):
# Handle tool calls - execute tools and add results to messages
self._handle_native_tool_calls(answer, available_functions)
# Continue loop to let LLM analyze results and decide next steps
continue
# Text or other response - handle as potential final answer
if isinstance(answer, str):
# Text response - this is the final answer
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
def _invoke_loop_native_no_tools(self) -> AgentFinish:
"""Execute a simple LLM call when no tools are available.
Returns:
Final answer from the agent.
"""
enforce_rpm_limit(self.request_within_rpm_limit)
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# OpenAI-style
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Anthropic-style
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Gemini-style
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
def _handle_native_tool_calls(
self,
tool_calls: list[Any],
available_functions: dict[str, Callable[..., Any]],
) -> None:
"""Handle a single native tool call from the LLM.
Executes only the FIRST tool call and appends the result to message history.
This enables sequential tool execution with reflection after each tool,
allowing the LLM to reason about results before deciding on next steps.
Args:
tool_calls: List of tool calls from the LLM (only first is processed).
available_functions: Dict mapping function names to callables.
"""
from datetime import datetime
import json
from crewai.events import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
if not tool_calls:
return
# Only process the FIRST tool call for sequential execution with reflection
tool_call = tool_calls[0]
# Extract tool call info - handle OpenAI-style, Anthropic-style, and Gemini-style
if hasattr(tool_call, "function"):
# OpenAI-style: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.function.name
func_args = tool_call.function.arguments
elif hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini-style: has .function_call.name and .function_call.args
call_id = f"call_{id(tool_call)}"
func_name = tool_call.function_call.name
func_args = (
dict(tool_call.function_call.args)
if tool_call.function_call.args
else {}
)
elif hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.name
func_args = tool_call.input # Already a dict in Anthropic
elif isinstance(tool_call, dict):
call_id = tool_call.get("id", f"call_{id(tool_call)}")
func_info = tool_call.get("function", {})
func_name = func_info.get("name", "") or tool_call.get("name", "")
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
else:
return
# Append assistant message with single tool call
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
],
}
self.messages.append(assistant_message)
# Parse arguments for the single tool call
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
),
)
# Execute the tool
print(f"Using Tool: {func_name}")
result = "Tool not found"
if func_name in available_functions:
try:
tool_func = available_functions[func_name]
result = tool_func(**args_dict)
if not isinstance(result, str):
result = str(result)
except Exception as e:
result = f"Error executing tool: {e}"
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"content": result,
}
self.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Tool {func_name} executed with result: {result[:200]}...",
color="green",
)
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.messages.append(reasoning_message)
async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute the agent asynchronously with given inputs.
@@ -382,6 +746,29 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
async def _ainvoke_loop(self) -> AgentFinish:
"""Execute agent loop asynchronously until completion.
Checks if the LLM supports native function calling and uses that
approach if available, otherwise falls back to the ReAct text pattern.
Returns:
Final answer from the agent.
"""
# Check if model supports native function calling
use_native_tools = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.original_tools
)
if use_native_tools:
return await self._ainvoke_loop_native_tools()
# Fall back to ReAct text-based pattern
return await self._ainvoke_loop_react()
async def _ainvoke_loop_react(self) -> AgentFinish:
"""Execute agent loop asynchronously using ReAct text-based pattern.
Returns:
Final answer from the agent.
"""
@@ -495,6 +882,139 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
async def _ainvoke_loop_native_tools(self) -> AgentFinish:
"""Execute agent loop asynchronously using native function calling.
This method uses the LLM's native tool/function calling capability
instead of the text-based ReAct pattern.
Returns:
Final answer from the agent.
"""
# Convert tools to OpenAI schema format
if not self.original_tools:
return await self._ainvoke_loop_native_no_tools()
openai_tools, available_functions = convert_tools_to_openai_schema(
self.original_tools
)
while True:
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
self._show_logs(formatted_answer)
return formatted_answer
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution
# via _handle_native_tool_calls to properly manage message history.
answer = await aget_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
tools=openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
print("--------------------------------")
print("native llm completion answer", answer)
print("--------------------------------")
# Check if the response is a list of tool calls
if (
isinstance(answer, list)
and answer
and self._is_tool_call_list(answer)
):
# Handle tool calls - execute tools and add results to messages
self._handle_native_tool_calls(answer, available_functions)
# Continue loop to let LLM analyze results and decide next steps
continue
# Text or other response - handle as potential final answer
if isinstance(answer, str):
# Text response - this is the final answer
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
async def _ainvoke_loop_native_no_tools(self) -> AgentFinish:
"""Execute a simple async LLM call when no tools are available.
Returns:
Final answer from the agent.
"""
enforce_rpm_limit(self.request_within_rpm_limit)
answer = await aget_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> AgentAction | AgentFinish:

View File

@@ -378,6 +378,12 @@ class EventListener(BaseEventListener):
self.formatter.handle_llm_tool_usage_finished(
event.tool_name,
)
else:
self.formatter.handle_tool_usage_finished(
event.tool_name,
event.output,
getattr(event, "run_attempts", None),
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source: Any, event: ToolUsageErrorEvent) -> None:

View File

@@ -366,6 +366,32 @@ To enable tracing, do any one of these:
self.print_panel(content, f"🔧 Tool Execution Started (#{iteration})", "yellow")
def handle_tool_usage_finished(
self,
tool_name: str,
output: str,
run_attempts: int | None = None,
) -> None:
"""Handle tool usage finished event with panel display."""
if not self.verbose:
return
iteration = self.tool_usage_counts.get(tool_name, 1)
content = Text()
content.append("Tool Completed\n", style="green bold")
content.append("Tool: ", style="white")
content.append(f"{tool_name}\n", style="green bold")
if output:
content.append("Output: ", style="white")
content.append(f"{output}\n", style="green")
self.print_panel(
content, f"✅ Tool Execution Completed (#{iteration})", "green"
)
def handle_tool_usage_error(
self,
tool_name: str,

View File

@@ -1,6 +1,8 @@
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
import json
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
from uuid import uuid4
@@ -17,16 +19,24 @@ from crewai.agents.parser import (
OutputParserError,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled_in_context,
)
from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.flow.flow import Flow, listen, or_, router, start
from crewai.hooks.llm_hooks import (
get_after_llm_call_hooks,
get_before_llm_call_hooks,
)
from crewai.utilities.agent_utils import (
convert_tools_to_openai_schema,
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
@@ -71,6 +81,8 @@ class AgentReActState(BaseModel):
current_answer: AgentAction | AgentFinish | None = Field(default=None)
is_finished: bool = Field(default=False)
ask_for_human_input: bool = Field(default=False)
use_native_tools: bool = Field(default=False)
pending_tool_calls: list[Any] = Field(default_factory=list)
class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@@ -179,6 +191,10 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
)
)
# Native tool calling support
self._openai_tools: list[dict[str, Any]] = []
self._available_functions: dict[str, Callable[..., Any]] = {}
self._state = AgentReActState()
def _ensure_flow_initialized(self) -> None:
@@ -189,14 +205,66 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
Only the instance that actually executes via invoke() will emit events.
"""
if not self._flow_initialized:
current_tracing = is_tracing_enabled_in_context()
# Now call Flow's __init__ which will replace self._state
# with Flow's managed state. Suppress flow events since this is
# an agent executor, not a user-facing flow.
super().__init__(
suppress_flow_events=True,
tracing=current_tracing if current_tracing else None,
)
self._flow_initialized = True
def _check_native_tool_support(self) -> bool:
"""Check if LLM supports native function calling.
Returns:
True if the LLM supports native function calling and tools are available.
"""
return (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and bool(self.original_tools)
)
def _setup_native_tools(self) -> None:
"""Convert tools to OpenAI schema format for native function calling."""
if self.original_tools:
self._openai_tools, self._available_functions = (
convert_tools_to_openai_schema(self.original_tools)
)
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# Check for OpenAI-style tool call structure
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Check for Anthropic-style tool call structure (ToolUseBlock)
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Check for Gemini-style function call (Part with function_call)
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
@property
def use_stop_words(self) -> bool:
"""Check to determine if stop words are being used.
@@ -229,6 +297,11 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def initialize_reasoning(self) -> Literal["initialized"]:
"""Initialize the reasoning flow and emit agent start logs."""
self._show_start_logs()
# Check for native tool support on first iteration
if self.state.iterations == 0:
self.state.use_native_tools = self._check_native_tool_support()
if self.state.use_native_tools:
self._setup_native_tools()
return "initialized"
@listen("force_final_answer")
@@ -303,6 +376,69 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
handle_unknown_error(self._printer, e)
raise
@listen("continue_reasoning_native")
def call_llm_native_tools(
self,
) -> Literal["native_tool_calls", "native_finished", "context_error"]:
"""Execute LLM call with native function calling.
Returns routing decision based on whether tool calls or final answer.
"""
try:
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution.
answer = get_llm_response(
llm=self.llm,
messages=list(self.state.messages),
callbacks=self.callbacks,
printer=self._printer,
tools=self._openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
# Check if the response is a list of tool calls
if isinstance(answer, list) and answer and self._is_tool_call_list(answer):
# Store tool calls for sequential processing
self.state.pending_tool_calls = list(answer)
return "native_tool_calls"
# Text response - this is the final answer
if isinstance(answer, str):
self.state.current_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(answer)
return "native_finished"
# Unexpected response type, treat as final answer
self.state.current_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(str(answer))
return "native_finished"
except Exception as e:
if is_context_length_exceeded(e):
self._last_context_error = e
return "context_error"
if e.__class__.__module__.startswith("litellm"):
raise e
handle_unknown_error(self._printer, e)
raise
@router(call_llm_and_parse)
def route_by_answer_type(self) -> Literal["execute_tool", "agent_finished"]:
"""Route based on whether answer is AgentAction or AgentFinish."""
@@ -358,6 +494,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.is_finished = True
return "tool_result_is_final"
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
return "tool_completed"
except Exception as e:
@@ -367,6 +511,143 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self._console.print(error_text)
raise
@listen("native_tool_calls")
def execute_native_tool(self) -> Literal["native_tool_completed"]:
"""Execute a single native tool call and inject reasoning prompt.
Processes only the FIRST tool call from pending_tool_calls for
sequential execution with reflection after each tool.
"""
if not self.state.pending_tool_calls:
return "native_tool_completed"
tool_call = self.state.pending_tool_calls[0]
self.state.pending_tool_calls = [] # Clear pending calls
# Extract tool call info - handle OpenAI, Anthropic, and Gemini formats
if hasattr(tool_call, "function"):
# OpenAI format: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.function.name
func_args = tool_call.function.arguments
elif hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini format: has .function_call.name and .function_call.args
call_id = f"call_{id(tool_call)}"
func_name = tool_call.function_call.name
func_args = (
dict(tool_call.function_call.args)
if tool_call.function_call.args
else {}
)
elif hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.name
func_args = tool_call.input # Already a dict in Anthropic
elif isinstance(tool_call, dict):
call_id = tool_call.get("id", f"call_{id(tool_call)}")
func_info = tool_call.get("function", {})
func_name = func_info.get("name", "") or tool_call.get("name", "")
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
else:
return "native_tool_completed"
# Append assistant message with single tool call
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
],
}
self.state.messages.append(assistant_message)
# Parse arguments for the single tool call
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
),
)
# Execute the tool
result = "Tool not found"
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
result = tool_func(**args_dict)
if not isinstance(result, str):
result = str(result)
except Exception as e:
result = f"Error executing tool: {e}"
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"content": result,
}
self.state.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Tool {func_name} executed with result: {result[:200]}...",
color="green",
)
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
return "native_tool_completed"
@router(execute_native_tool)
def increment_native_and_continue(self) -> Literal["initialized"]:
"""Increment iteration counter after native tool execution."""
self.state.iterations += 1
return "initialized"
@listen("initialized")
def continue_iteration(self) -> Literal["check_iteration"]:
"""Bridge listener that connects iteration loop back to iteration check."""
@@ -375,10 +656,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@router(or_(initialize_reasoning, continue_iteration))
def check_max_iterations(
self,
) -> Literal["force_final_answer", "continue_reasoning"]:
) -> Literal[
"force_final_answer", "continue_reasoning", "continue_reasoning_native"
]:
"""Check if max iterations reached before proceeding with reasoning."""
if has_reached_max_iterations(self.state.iterations, self.max_iter):
return "force_final_answer"
if self.state.use_native_tools:
return "continue_reasoning_native"
return "continue_reasoning"
@router(execute_tool_action)
@@ -387,7 +672,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.iterations += 1
return "initialized"
@listen(or_("agent_finished", "tool_result_is_final"))
@listen(or_("agent_finished", "tool_result_is_final", "native_finished"))
def finalize(self) -> Literal["completed", "skipped"]:
"""Finalize execution and emit completion logs."""
if self.state.current_answer is None:
@@ -475,6 +760,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
self.state.use_native_tools = False
self.state.pending_tool_calls = []
if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)

View File

@@ -931,7 +931,6 @@ class LLM(BaseLLM):
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
if not tool_calls or not available_functions:
if response_model and self.is_litellm:
instructor_instance = InternalInstructor(
content=full_response,
@@ -1144,8 +1143,12 @@ class LLM(BaseLLM):
if response_model:
params["response_model"] = response_model
response = litellm.completion(**params)
if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage:
if (
hasattr(response, "usage")
and not isinstance(response.usage, type)
and response.usage
):
usage_info = response.usage
self._track_token_usage_internal(usage_info)
@@ -1199,16 +1202,19 @@ class LLM(BaseLLM):
)
return text_response
# --- 6) If there is no text response, no available functions, but there are tool calls, return the tool calls
if tool_calls and not available_functions and not text_response:
# --- 6) If there are tool calls but no available functions, return the tool calls
# This allows the caller (e.g., executor) to handle tool execution
if tool_calls and not available_functions:
return tool_calls
# --- 7) Handle tool calls if present
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# --- 7) Handle tool calls if present (execute when available_functions provided)
if tool_calls and available_functions:
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# --- 8) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(
response=text_response,
@@ -1273,7 +1279,11 @@ class LLM(BaseLLM):
params["response_model"] = response_model
response = await litellm.acompletion(**params)
if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage:
if (
hasattr(response, "usage")
and not isinstance(response.usage, type)
and response.usage
):
usage_info = response.usage
self._track_token_usage_internal(usage_info)
@@ -1321,14 +1331,18 @@ class LLM(BaseLLM):
)
return text_response
if tool_calls and not available_functions and not text_response:
# If there are tool calls but no available functions, return the tool calls
# This allows the caller (e.g., executor) to handle tool execution
if tool_calls and not available_functions:
return tool_calls
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# Handle tool calls if present (execute when available_functions provided)
if tool_calls and available_functions:
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
self._handle_emit_call_events(
response=text_response,
@@ -1363,7 +1377,7 @@ class LLM(BaseLLM):
"""
full_response = ""
chunk_count = 0
usage_info = None
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict(

View File

@@ -445,7 +445,7 @@ class BaseLLM(ABC):
from_agent=from_agent,
)
return str(result)
return result
except Exception as e:
error_msg = f"Error executing function '{function_name}': {e!s}"

View File

@@ -418,6 +418,7 @@ class AnthropicCompletion(BaseLLM):
- System messages are separate from conversation messages
- Messages must alternate between user and assistant
- First message must be from user
- Tool results must be in user messages with tool_result content blocks
- When thinking is enabled, assistant messages must start with thinking blocks
Args:
@@ -431,6 +432,7 @@ class AnthropicCompletion(BaseLLM):
formatted_messages: list[LLMMessage] = []
system_message: str | None = None
pending_tool_results: list[dict[str, Any]] = []
for message in base_formatted:
role = message.get("role")
@@ -441,16 +443,47 @@ class AnthropicCompletion(BaseLLM):
system_message += f"\n\n{content}"
else:
system_message = cast(str, content)
else:
role_str = role if role is not None else "user"
elif role == "tool":
# Convert OpenAI-style tool message to Anthropic tool_result format
# These will be collected and added as a user message
tool_call_id = message.get("tool_call_id", "")
tool_result = {
"type": "tool_result",
"tool_use_id": tool_call_id,
"content": content if content else "",
}
pending_tool_results.append(tool_result)
elif role == "assistant":
# First, flush any pending tool results as a user message
if pending_tool_results:
formatted_messages.append(
{"role": "user", "content": pending_tool_results}
)
pending_tool_results = []
if isinstance(content, list):
formatted_messages.append({"role": role_str, "content": content})
elif (
role_str == "assistant"
and self.thinking
and self.previous_thinking_blocks
):
# Handle assistant message with tool_calls (convert to Anthropic format)
tool_calls = message.get("tool_calls", [])
if tool_calls:
assistant_content: list[dict[str, Any]] = []
for tc in tool_calls:
if isinstance(tc, dict):
func = tc.get("function", {})
tool_use = {
"type": "tool_use",
"id": tc.get("id", ""),
"name": func.get("name", ""),
"input": json.loads(func.get("arguments", "{}"))
if isinstance(func.get("arguments"), str)
else func.get("arguments", {}),
}
assistant_content.append(tool_use)
if assistant_content:
formatted_messages.append(
{"role": "assistant", "content": assistant_content}
)
elif isinstance(content, list):
formatted_messages.append({"role": "assistant", "content": content})
elif self.thinking and self.previous_thinking_blocks:
structured_content = cast(
list[dict[str, Any]],
[
@@ -459,14 +492,34 @@ class AnthropicCompletion(BaseLLM):
],
)
formatted_messages.append(
LLMMessage(role=role_str, content=structured_content)
LLMMessage(role="assistant", content=structured_content)
)
else:
content_str = content if content is not None else ""
formatted_messages.append(
LLMMessage(role="assistant", content=content_str)
)
else:
# User message - first flush any pending tool results
if pending_tool_results:
formatted_messages.append(
{"role": "user", "content": pending_tool_results}
)
pending_tool_results = []
role_str = role if role is not None else "user"
if isinstance(content, list):
formatted_messages.append({"role": role_str, "content": content})
else:
content_str = content if content is not None else ""
formatted_messages.append(
LLMMessage(role=role_str, content=content_str)
)
# Flush any remaining pending tool results
if pending_tool_results:
formatted_messages.append({"role": "user", "content": pending_tool_results})
# Ensure first message is from user (Anthropic requirement)
if not formatted_messages:
# If no messages, add a default user message
@@ -526,13 +579,19 @@ class AnthropicCompletion(BaseLLM):
return structured_json
# Check if Claude wants to use tools
if response.content and available_functions:
if response.content:
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]
if tool_uses:
# Handle tool use conversation flow
# If no available_functions, return tool calls for executor to handle
# This allows the executor to manage tool execution with proper
# message history and post-tool reasoning prompts
if not available_functions:
return list(tool_uses)
# Handle tool use conversation flow internally
return self._handle_tool_use_conversation(
response,
tool_uses,
@@ -696,7 +755,7 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if final_message.content and available_functions:
if final_message.content:
tool_uses = [
block
for block in final_message.content
@@ -704,7 +763,11 @@ class AnthropicCompletion(BaseLLM):
]
if tool_uses:
# Handle tool use conversation flow
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
# Handle tool use conversation flow internally
return self._handle_tool_use_conversation(
final_message,
tool_uses,
@@ -933,12 +996,16 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if response.content and available_functions:
if response.content:
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
return await self._ahandle_tool_use_conversation(
response,
tool_uses,
@@ -1079,7 +1146,7 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if final_message.content and available_functions:
if final_message.content:
tool_uses = [
block
for block in final_message.content
@@ -1087,6 +1154,10 @@ class AnthropicCompletion(BaseLLM):
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
return await self._ahandle_tool_use_conversation(
final_message,
tool_uses,

View File

@@ -514,10 +514,31 @@ class AzureCompletion(BaseLLM):
for message in base_formatted:
role = message.get("role", "user") # Default to user if no role
content = message.get("content", "")
# Handle None content - Azure requires string content
content = message.get("content") or ""
# Azure AI Inference requires both 'role' and 'content'
azure_messages.append({"role": role, "content": content})
# Handle tool role messages - keep as tool role for Azure OpenAI
if role == "tool":
tool_call_id = message.get("tool_call_id", "unknown")
azure_messages.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": content,
}
)
# Handle assistant messages with tool_calls
elif role == "assistant" and message.get("tool_calls"):
tool_calls = message.get("tool_calls", [])
azure_msg: LLMMessage = {
"role": "assistant",
"content": content, # Already defaulted to "" above
"tool_calls": tool_calls,
}
azure_messages.append(azure_msg)
else:
# Azure AI Inference requires both 'role' and 'content'
azure_messages.append({"role": role, "content": content})
return azure_messages
@@ -604,6 +625,11 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
)
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
return list(message.tool_calls)
# Handle tool calls
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0] # Handle first tool call
@@ -775,6 +801,21 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
)
# If there are tool_calls but no available_functions, return them
# in OpenAI-compatible format for executor to handle
if tool_calls and not available_functions:
return [
{
"id": call_data.get("id", f"call_{idx}"),
"type": "function",
"function": {
"name": call_data["name"],
"arguments": call_data["arguments"],
},
}
for idx, call_data in tool_calls.items()
]
# Handle completed tool calls
if tool_calls and available_functions:
for call_data in tool_calls.values():

View File

@@ -606,6 +606,17 @@ class GeminiCompletion(BaseLLM):
if response.candidates and (self.tools or available_functions):
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
# Collect function call parts
function_call_parts = [
part for part in candidate.content.parts if part.function_call
]
# If there are function calls but no available_functions,
# return them for the executor to handle (like OpenAI/Anthropic)
if function_call_parts and not available_functions:
return function_call_parts
# Otherwise execute the tools internally
for part in candidate.content.parts:
if part.function_call:
function_name = part.function_call.name
@@ -720,7 +731,7 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str:
) -> str | list[dict[str, Any]]:
"""Finalize streaming response with usage tracking, function execution, and events.
Args:
@@ -738,6 +749,21 @@ class GeminiCompletion(BaseLLM):
"""
self._track_token_usage_internal(usage_data)
# If there are function calls but no available_functions,
# return them for the executor to handle
if function_calls and not available_functions:
return [
{
"id": call_data["id"],
"function": {
"name": call_data["name"],
"arguments": json.dumps(call_data["args"]),
},
"type": "function",
}
for call_data in function_calls.values()
]
# Handle completed function calls
if function_calls and available_functions:
for call_data in function_calls.values():

View File

@@ -428,6 +428,12 @@ class OpenAICompletion(BaseLLM):
choice: Choice = response.choices[0]
message = choice.message
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
return list(message.tool_calls)
# If there are tool_calls and available_functions, execute the tools
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0]
function_name = tool_call.function.name
@@ -725,6 +731,15 @@ class OpenAICompletion(BaseLLM):
choice: Choice = response.choices[0]
message = choice.message
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
print("--------------------------------")
print("lorenze tool_calls", list(message.tool_calls))
print("--------------------------------")
return list(message.tool_calls)
# If there are tool_calls and available_functions, execute the tools
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0]
function_name = tool_call.function.name

View File

@@ -11,6 +11,9 @@
"role_playing": "You are {role}. {backstory}\nYour personal goal is: {goal}",
"tools": "\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\n{tools}\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [{tool_names}], just the name, exactly as it's written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final answer to the original input question\n```",
"no_tools": "\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!",
"native_tools": "\nUse available tools to gather information and complete your task.",
"native_task": "\nCurrent Task: {input}\n\nThis is VERY important to you, your job depends on it!",
"post_tool_reasoning": "PAUSE and THINK before responding.\n\nInternally consider (DO NOT output these steps):\n- What key insights did the tool provide?\n- Have I fulfilled ALL requirements from my original instructions (e.g., minimum tool calls, specific sources)?\n- Do I have enough information to fully answer the task?\n\nIF you have NOT met all requirements or need more information: Call another tool now.\n\nIF you have met all requirements and have sufficient information: Provide ONLY your final answer in the format specified by the task's expected output. Do NOT include reasoning steps, analysis sections, or meta-commentary. Just deliver the answer.",
"format": "I MUST either use a tool (use one at time) OR give my best final answer not both at the same time. When responding, I must use the following format:\n\n```\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action, dictionary enclosed in curly braces\nObservation: the result of the action\n```\nThis Thought/Action/Action Input/Result can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",
"final_answer_format": "If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfies the expected criteria, use the EXACT format below:\n\n```\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task.\n\n```",
"format_without_tools": "\nSorry, I didn't use the right format. I MUST either use a tool (among the available ones), OR give my best final answer.\nHere is the expected format I must follow:\n\n```\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n```\n This Thought/Action/Action Input/Result process can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",

View File

@@ -108,6 +108,65 @@ def render_text_description_and_args(
return "\n".join(tool_strings)
def convert_tools_to_openai_schema(
tools: Sequence[BaseTool | CrewStructuredTool],
) -> tuple[list[dict[str, Any]], dict[str, Callable[..., Any]]]:
"""Convert CrewAI tools to OpenAI function calling format.
This function converts CrewAI BaseTool and CrewStructuredTool objects
into the OpenAI-compatible tool schema format that can be passed to
LLM providers for native function calling.
Args:
tools: List of CrewAI tool objects to convert.
Returns:
Tuple containing:
- List of OpenAI-format tool schema dictionaries
- Dict mapping tool names to their callable run() methods
Example:
>>> tools = [CalculatorTool(), SearchTool()]
>>> schemas, functions = convert_tools_to_openai_schema(tools)
>>> # schemas can be passed to llm.call(tools=schemas)
>>> # functions can be passed to llm.call(available_functions=functions)
"""
openai_tools: list[dict[str, Any]] = []
available_functions: dict[str, Callable[..., Any]] = {}
for tool in tools:
# Get the JSON schema for tool parameters
parameters: dict[str, Any] = {}
if hasattr(tool, "args_schema") and tool.args_schema is not None:
try:
parameters = tool.args_schema.model_json_schema()
# Remove title and description from schema root as they're redundant
parameters.pop("title", None)
parameters.pop("description", None)
except Exception:
parameters = {}
# Extract original description from formatted description
# BaseTool formats description as "Tool Name: ...\nTool Arguments: ...\nTool Description: {original}"
description = tool.description
if "Tool Description:" in description:
# Extract the original description after "Tool Description:"
description = description.split("Tool Description:")[-1].strip()
schema: dict[str, Any] = {
"type": "function",
"function": {
"name": tool.name,
"description": description,
"parameters": parameters,
},
}
openai_tools.append(schema)
available_functions[tool.name] = tool.run
return openai_tools, available_functions
def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
"""Check if the maximum number of iterations has been reached.
@@ -234,11 +293,13 @@ def get_llm_response(
messages: list[LLMMessage],
callbacks: list[TokenCalcHandler],
printer: Printer,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Callable[..., Any]] | None = None,
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | LiteAgent | None = None,
) -> str:
) -> str | Any:
"""Call the LLM and return the response, handling any invalid responses.
Args:
@@ -246,13 +307,16 @@ def get_llm_response(
messages: The messages to send to the LLM.
callbacks: List of callbacks for the LLM call.
printer: Printer instance for output.
tools: Optional list of tool schemas for native function calling.
available_functions: Optional dict mapping function names to callables.
from_task: Optional task context for the LLM call.
from_agent: Optional agent context for the LLM call.
response_model: Optional Pydantic model for structured outputs.
executor_context: Optional executor context for hook invocation.
Returns:
The response from the LLM as a string.
The response from the LLM as a string, or tool call results if
native function calling is used.
Raises:
Exception: If an error occurs.
@@ -267,7 +331,9 @@ def get_llm_response(
try:
answer = llm.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent, # type: ignore[arg-type]
response_model=response_model,
@@ -289,11 +355,13 @@ async def aget_llm_response(
messages: list[LLMMessage],
callbacks: list[TokenCalcHandler],
printer: Printer,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Callable[..., Any]] | None = None,
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | None = None,
) -> str:
) -> str | Any:
"""Call the LLM asynchronously and return the response.
Args:
@@ -301,13 +369,16 @@ async def aget_llm_response(
messages: The messages to send to the LLM.
callbacks: List of callbacks for the LLM call.
printer: Printer instance for output.
tools: Optional list of tool schemas for native function calling.
available_functions: Optional dict mapping function names to callables.
from_task: Optional task context for the LLM call.
from_agent: Optional agent context for the LLM call.
response_model: Optional Pydantic model for structured outputs.
executor_context: Optional executor context for hook invocation.
Returns:
The response from the LLM as a string.
The response from the LLM as a string, or tool call results if
native function calling is used.
Raises:
Exception: If an error occurs.
@@ -321,7 +392,9 @@ async def aget_llm_response(
try:
answer = await llm.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent, # type: ignore[arg-type]
response_model=response_model,

View File

@@ -22,7 +22,9 @@ class SystemPromptResult(StandardPromptResult):
user: Annotated[str, "The user prompt component"]
COMPONENTS = Literal["role_playing", "tools", "no_tools", "task"]
COMPONENTS = Literal[
"role_playing", "tools", "no_tools", "native_tools", "task", "native_task"
]
class Prompts(BaseModel):
@@ -36,6 +38,10 @@ class Prompts(BaseModel):
has_tools: bool = Field(
default=False, description="Indicates if the agent has access to tools"
)
use_native_tool_calling: bool = Field(
default=False,
description="Whether to use native function calling instead of ReAct format",
)
system_template: str | None = Field(
default=None, description="Custom system prompt template"
)
@@ -58,12 +64,24 @@ class Prompts(BaseModel):
A dictionary containing the constructed prompt(s).
"""
slices: list[COMPONENTS] = ["role_playing"]
# When using native tool calling with tools, use native_tools instructions
# When using ReAct pattern with tools, use tools instructions
# When no tools are available, use no_tools instructions
if self.has_tools:
slices.append("tools")
if self.use_native_tool_calling:
slices.append("native_tools")
else:
slices.append("tools")
else:
slices.append("no_tools")
system: str = self._build_prompt(slices)
slices.append("task")
# Use native_task for native tool calling (no "Thought:" prompt)
# Use task for ReAct pattern (includes "Thought:" prompt)
task_slice: COMPONENTS = (
"native_task" if self.use_native_tool_calling else "task"
)
slices.append(task_slice)
if (
not self.system_template
@@ -72,7 +90,7 @@ class Prompts(BaseModel):
):
return SystemPromptResult(
system=system,
user=self._build_prompt(["task"]),
user=self._build_prompt([task_slice]),
prompt=self._build_prompt(slices),
)
return StandardPromptResult(

View File

@@ -0,0 +1,479 @@
"""Integration tests for native tool calling functionality.
These tests verify that agents can use native function calling
when the LLM supports it, across multiple providers.
"""
from __future__ import annotations
import os
from typing import Any
from unittest.mock import patch, MagicMock
import pytest
from pydantic import BaseModel, Field
from crewai import Agent, Crew, Task
from crewai.llm import LLM
from crewai.tools.base_tool import BaseTool
# Check for optional provider availability
try:
import anthropic
HAS_ANTHROPIC = True
except ImportError:
HAS_ANTHROPIC = False
try:
import google.genai
HAS_GOOGLE_GENAI = True
except ImportError:
HAS_GOOGLE_GENAI = False
try:
import boto3
HAS_BOTO3 = True
except ImportError:
HAS_BOTO3 = False
class CalculatorInput(BaseModel):
"""Input schema for calculator tool."""
expression: str = Field(description="Mathematical expression to evaluate")
class CalculatorTool(BaseTool):
"""A calculator tool that performs mathematical calculations."""
name: str = "calculator"
description: str = "Perform mathematical calculations. Use this for any math operations."
args_schema: type[BaseModel] = CalculatorInput
def _run(self, expression: str) -> str:
"""Execute the calculation."""
try:
# Safe evaluation for basic math
result = eval(expression) # noqa: S307
return f"The result of {expression} is {result}"
except Exception as e:
return f"Error calculating {expression}: {e}"
class WeatherInput(BaseModel):
"""Input schema for weather tool."""
location: str = Field(description="City name to get weather for")
class WeatherTool(BaseTool):
"""A mock weather tool for testing."""
name: str = "get_weather"
description: str = "Get the current weather for a location"
args_schema: type[BaseModel] = WeatherInput
def _run(self, location: str) -> str:
"""Get weather (mock implementation)."""
return f"The weather in {location} is sunny with a temperature of 72°F"
@pytest.fixture
def calculator_tool() -> CalculatorTool:
"""Create a calculator tool for testing."""
return CalculatorTool()
@pytest.fixture
def weather_tool() -> WeatherTool:
"""Create a weather tool for testing."""
return WeatherTool()
# =============================================================================
# OpenAI Provider Tests
# =============================================================================
class TestOpenAINativeToolCalling:
"""Tests for native tool calling with OpenAI models."""
@pytest.mark.vcr()
def test_openai_agent_with_native_tool_calling(
self, calculator_tool: CalculatorTool
) -> None:
"""Test OpenAI agent can use native tool calling."""
agent = Agent(
role="Math Assistant",
goal="Help users with mathematical calculations",
backstory="You are a helpful math assistant.",
tools=[calculator_tool],
llm=LLM(model="gpt-4o-mini"),
verbose=False,
max_iter=3,
)
task = Task(
description="Calculate what is 15 * 8",
expected_output="The result of the calculation",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.raw is not None
assert "120" in str(result.raw)
def test_openai_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test OpenAI agent kickoff with mocked LLM call."""
llm = LLM(model="gpt-4o-mini")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Anthropic Provider Tests
# =============================================================================
@pytest.mark.skipif(not HAS_ANTHROPIC, reason="anthropic package not installed")
class TestAnthropicNativeToolCalling:
"""Tests for native tool calling with Anthropic models."""
@pytest.fixture(autouse=True)
def mock_anthropic_api_key(self):
"""Mock ANTHROPIC_API_KEY for tests."""
if "ANTHROPIC_API_KEY" not in os.environ:
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "test-key"}):
yield
else:
yield
@pytest.mark.vcr()
def test_anthropic_agent_with_native_tool_calling(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Anthropic agent can use native tool calling."""
agent = Agent(
role="Math Assistant",
goal="Help users with mathematical calculations",
backstory="You are a helpful math assistant.",
tools=[calculator_tool],
llm=LLM(model="anthropic/claude-3-5-haiku-20241022"),
verbose=False,
max_iter=3,
)
task = Task(
description="Calculate what is 15 * 8",
expected_output="The result of the calculation",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.raw is not None
def test_anthropic_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Anthropic agent kickoff with mocked LLM call."""
llm = LLM(model="anthropic/claude-3-5-haiku-20241022")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Google/Gemini Provider Tests
# =============================================================================
@pytest.mark.skipif(not HAS_GOOGLE_GENAI, reason="google-genai package not installed")
class TestGeminiNativeToolCalling:
"""Tests for native tool calling with Gemini models."""
@pytest.fixture(autouse=True)
def mock_google_api_key(self):
"""Mock GOOGLE_API_KEY for tests."""
with patch.dict(os.environ, {"GOOGLE_API_KEY": "test-key"}):
yield
@pytest.mark.vcr()
def test_gemini_agent_with_native_tool_calling(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Gemini agent can use native tool calling."""
agent = Agent(
role="Math Assistant",
goal="Help users with mathematical calculations",
backstory="You are a helpful math assistant.",
tools=[calculator_tool],
llm=LLM(model="gemini/gemini-2.0-flash-001"),
verbose=False,
max_iter=3,
)
task = Task(
description="Calculate what is 15 * 8",
expected_output="The result of the calculation",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.raw is not None
def test_gemini_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Gemini agent kickoff with mocked LLM call."""
llm = LLM(model="gemini/gemini-2.0-flash-001")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Azure Provider Tests
# =============================================================================
class TestAzureNativeToolCalling:
"""Tests for native tool calling with Azure OpenAI models."""
@pytest.fixture(autouse=True)
def mock_azure_env(self):
"""Mock Azure environment variables for tests."""
env_vars = {
"AZURE_API_KEY": "test-key",
"AZURE_API_BASE": "https://test.openai.azure.com",
"AZURE_API_VERSION": "2024-02-15-preview",
}
with patch.dict(os.environ, env_vars):
yield
def test_azure_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Azure agent kickoff with mocked LLM call."""
llm = LLM(
model="azure/gpt-4o-mini",
api_key="test-key",
base_url="https://test.openai.azure.com",
)
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Bedrock Provider Tests
# =============================================================================
@pytest.mark.skipif(not HAS_BOTO3, reason="boto3 package not installed")
class TestBedrockNativeToolCalling:
"""Tests for native tool calling with AWS Bedrock models."""
@pytest.fixture(autouse=True)
def mock_aws_env(self):
"""Mock AWS environment variables for tests."""
env_vars = {
"AWS_ACCESS_KEY_ID": "test-key",
"AWS_SECRET_ACCESS_KEY": "test-secret",
"AWS_REGION": "us-east-1",
}
with patch.dict(os.environ, env_vars):
yield
def test_bedrock_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Bedrock agent kickoff with mocked LLM call."""
llm = LLM(model="bedrock/anthropic.claude-3-haiku-20240307-v1:0")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Cross-Provider Native Tool Calling Behavior Tests
# =============================================================================
class TestNativeToolCallingBehavior:
"""Tests for native tool calling behavior across providers."""
def test_supports_function_calling_check(self) -> None:
"""Test that supports_function_calling() is properly checked."""
# OpenAI should support function calling
openai_llm = LLM(model="gpt-4o-mini")
assert hasattr(openai_llm, "supports_function_calling")
assert openai_llm.supports_function_calling() is True
@pytest.mark.skipif(not HAS_ANTHROPIC, reason="anthropic package not installed")
def test_anthropic_supports_function_calling(self) -> None:
"""Test that Anthropic models support function calling."""
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "test-key"}):
llm = LLM(model="anthropic/claude-3-5-haiku-20241022")
assert hasattr(llm, "supports_function_calling")
assert llm.supports_function_calling() is True
@pytest.mark.skipif(not HAS_GOOGLE_GENAI, reason="google-genai package not installed")
def test_gemini_supports_function_calling(self) -> None:
"""Test that Gemini models support function calling."""
# with patch.dict(os.environ, {"GOOGLE_API_KEY": "test-key"}):
print("GOOGLE_API_KEY", os.getenv("GOOGLE_API_KEY"))
llm = LLM(model="gemini/gemini-2.5-flash")
assert hasattr(llm, "supports_function_calling")
# Gemini uses supports_tools property
assert llm.supports_function_calling() is True
# =============================================================================
# Token Usage Tests
# =============================================================================
class TestNativeToolCallingTokenUsage:
"""Tests for token usage with native tool calling."""
@pytest.mark.vcr()
def test_openai_native_tool_calling_token_usage(
self, calculator_tool: CalculatorTool
) -> None:
"""Test token usage tracking with OpenAI native tool calling."""
agent = Agent(
role="Calculator",
goal="Perform calculations efficiently",
backstory="You calculate things.",
tools=[calculator_tool],
llm=LLM(model="gpt-4o-mini"),
verbose=False,
max_iter=3,
)
task = Task(
description="What is 100 / 4?",
expected_output="The result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.token_usage is not None
assert result.token_usage.total_tokens > 0
assert result.token_usage.successful_requests >= 1
print(f"\n[OPENAI NATIVE TOOL CALLING TOKEN USAGE]")
print(f" Prompt tokens: {result.token_usage.prompt_tokens}")
print(f" Completion tokens: {result.token_usage.completion_tokens}")
print(f" Total tokens: {result.token_usage.total_tokens}")

View File

@@ -0,0 +1,214 @@
"""Tests for agent utility functions."""
from __future__ import annotations
from typing import Any
import pytest
from pydantic import BaseModel, Field
from crewai.tools.base_tool import BaseTool
from crewai.utilities.agent_utils import convert_tools_to_openai_schema
class CalculatorInput(BaseModel):
"""Input schema for calculator tool."""
expression: str = Field(description="Mathematical expression to evaluate")
class CalculatorTool(BaseTool):
"""A simple calculator tool for testing."""
name: str = "calculator"
description: str = "Perform mathematical calculations"
args_schema: type[BaseModel] = CalculatorInput
def _run(self, expression: str) -> str:
"""Execute the calculation."""
try:
result = eval(expression) # noqa: S307
return str(result)
except Exception as e:
return f"Error: {e}"
class SearchInput(BaseModel):
"""Input schema for search tool."""
query: str = Field(description="Search query")
max_results: int = Field(default=10, description="Maximum number of results")
class SearchTool(BaseTool):
"""A search tool for testing."""
name: str = "web_search"
description: str = "Search the web for information"
args_schema: type[BaseModel] = SearchInput
def _run(self, query: str, max_results: int = 10) -> str:
"""Execute the search."""
return f"Search results for '{query}' (max {max_results})"
class NoSchemaTool(BaseTool):
"""A tool without an args schema for testing edge cases."""
name: str = "simple_tool"
description: str = "A simple tool with no schema"
def _run(self, **kwargs: Any) -> str:
"""Execute the tool."""
return "Simple tool executed"
class TestConvertToolsToOpenaiSchema:
"""Tests for convert_tools_to_openai_schema function."""
def test_converts_single_tool(self) -> None:
"""Test converting a single tool to OpenAI schema."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert len(schemas) == 1
assert len(functions) == 1
schema = schemas[0]
assert schema["type"] == "function"
assert schema["function"]["name"] == "calculator"
assert schema["function"]["description"] == "Perform mathematical calculations"
assert "properties" in schema["function"]["parameters"]
assert "expression" in schema["function"]["parameters"]["properties"]
def test_converts_multiple_tools(self) -> None:
"""Test converting multiple tools to OpenAI schema."""
tools = [CalculatorTool(), SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert len(schemas) == 2
assert len(functions) == 2
# Check calculator
calc_schema = next(s for s in schemas if s["function"]["name"] == "calculator")
assert calc_schema["function"]["description"] == "Perform mathematical calculations"
# Check search
search_schema = next(s for s in schemas if s["function"]["name"] == "web_search")
assert search_schema["function"]["description"] == "Search the web for information"
assert "query" in search_schema["function"]["parameters"]["properties"]
assert "max_results" in search_schema["function"]["parameters"]["properties"]
def test_functions_dict_contains_callables(self) -> None:
"""Test that the functions dict maps names to callable run methods."""
tools = [CalculatorTool(), SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert "calculator" in functions
assert "web_search" in functions
assert callable(functions["calculator"])
assert callable(functions["web_search"])
def test_function_can_be_called(self) -> None:
"""Test that the returned function can be called."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
result = functions["calculator"](expression="2 + 2")
assert result == "4"
def test_empty_tools_list(self) -> None:
"""Test with an empty tools list."""
schemas, functions = convert_tools_to_openai_schema([])
assert schemas == []
assert functions == {}
def test_schema_has_required_fields(self) -> None:
"""Test that the schema includes required fields information."""
tools = [SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
schema = schemas[0]
params = schema["function"]["parameters"]
# Should have required array
assert "required" in params
assert "query" in params["required"]
def test_tool_without_args_schema(self) -> None:
"""Test converting a tool that doesn't have an args_schema."""
# Create a minimal tool without args_schema
class MinimalTool(BaseTool):
name: str = "minimal"
description: str = "A minimal tool"
def _run(self) -> str:
return "done"
tools = [MinimalTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert len(schemas) == 1
schema = schemas[0]
assert schema["function"]["name"] == "minimal"
# Parameters should be empty dict or have minimal schema
assert isinstance(schema["function"]["parameters"], dict)
def test_schema_structure_matches_openai_format(self) -> None:
"""Test that the schema structure matches OpenAI's expected format."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
schema = schemas[0]
# Top level must have "type": "function"
assert schema["type"] == "function"
# Must have "function" key with nested structure
assert "function" in schema
func = schema["function"]
# Function must have name and description
assert "name" in func
assert "description" in func
assert isinstance(func["name"], str)
assert isinstance(func["description"], str)
# Parameters should be a valid JSON schema
assert "parameters" in func
params = func["parameters"]
assert isinstance(params, dict)
def test_removes_redundant_schema_fields(self) -> None:
"""Test that redundant title and description are removed from parameters."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
params = schemas[0]["function"]["parameters"]
# Title should be removed as it's redundant with function name
assert "title" not in params
def test_preserves_field_descriptions(self) -> None:
"""Test that field descriptions are preserved in the schema."""
tools = [SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
params = schemas[0]["function"]["parameters"]
query_prop = params["properties"]["query"]
# Field description should be preserved
assert "description" in query_prop
assert query_prop["description"] == "Search query"
def test_preserves_default_values(self) -> None:
"""Test that default values are preserved in the schema."""
tools = [SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
params = schemas[0]["function"]["parameters"]
max_results_prop = params["properties"]["max_results"]
# Default value should be preserved
assert "default" in max_results_prop
assert max_results_prop["default"] == 10