consolidate agent logic

This commit is contained in:
lorenzejay
2026-02-13 13:55:28 -08:00
parent e26d3e471d
commit fd6558e0f2
4 changed files with 530 additions and 639 deletions

View File

@@ -8,8 +8,6 @@ here — moved from AgentExecutor so the outer loop stays clean.
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
import json
import time
from typing import TYPE_CHECKING, Any
@@ -19,24 +17,15 @@ from crewai.agents.parser import (
AgentAction,
AgentFinish,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.hooks.tool_hooks import (
ToolCallHookContext,
get_after_tool_call_hooks,
get_before_tool_call_hooks,
)
from crewai.utilities.agent_utils import (
convert_tools_to_openai_schema,
build_tool_calls_assistant_message,
check_native_tool_support,
enforce_rpm_limit,
extract_tool_call_info,
execute_single_native_tool_call,
format_message_for_llm,
is_tool_call_list,
process_llm_response,
track_delegation_if_needed,
setup_native_tools,
)
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.planning_types import TodoItem
@@ -115,11 +104,13 @@ class StepExecutor:
self._printer: Printer = Printer()
# Native tool support — set up once
self._use_native_tools = self._check_native_tool_support()
self._use_native_tools = check_native_tool_support(self.llm, self.original_tools)
self._openai_tools: list[dict[str, Any]] = []
self._available_functions: dict[str, Callable[..., Any]] = {}
if self._use_native_tools:
self._setup_native_tools()
if self._use_native_tools and self.original_tools:
self._openai_tools, self._available_functions = setup_native_tools(
self.original_tools
)
# ------------------------------------------------------------------
# Public API
@@ -372,7 +363,7 @@ class StepExecutor:
raise ValueError("Empty response from LLM")
# Check if the response is a list of tool calls
if isinstance(answer, list) and answer and self._is_tool_call_list(answer):
if isinstance(answer, list) and answer and is_tool_call_list(answer):
return self._execute_native_tool_calls(answer, messages, tool_calls_made)
# Text response — this is the final answer
@@ -395,236 +386,36 @@ class StepExecutor:
Returns final answer string if a tool has result_as_answer, else None.
"""
# Build assistant message with tool calls
tool_calls_to_report: list[dict[str, Any]] = []
for tool_call in tool_calls:
info = extract_tool_call_info(tool_call)
if not info:
continue
call_id, func_name, func_args = info
tool_calls_to_report.append(
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
)
if tool_calls_to_report:
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": tool_calls_to_report,
}
# Preserve raw parts for Gemini compatibility
if all(type(tc).__qualname__ == "Part" for tc in tool_calls):
assistant_message["raw_tool_call_parts"] = list(tool_calls)
# Build and append assistant message with tool call reports
assistant_message, _reports = build_tool_calls_assistant_message(tool_calls)
if assistant_message:
messages.append(assistant_message)
# Execute each tool call
# Execute each tool call via shared pipeline
final_answer: str | None = None
for tool_call in tool_calls:
info = extract_tool_call_info(tool_call)
if not info:
continue
call_id, func_name, func_args = info
tool_calls_made.append(func_name)
# Parse arguments
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
agent_key = (
getattr(self.agent, "key", "unknown") if self.agent else "unknown"
)
# Find original tool for cache_function and result_as_answer
original_tool = None
for tool in self.original_tools:
if sanitize_tool_name(tool.name) == func_name:
original_tool = tool
break
# Check max usage count
max_usage_reached = False
if (
original_tool
and original_tool.max_usage_count is not None
and original_tool.current_usage_count >= original_tool.max_usage_count
):
max_usage_reached = True
# Check cache
from_cache = False
input_str = json.dumps(args_dict) if args_dict else ""
result = "Tool not found"
if self.tools_handler and self.tools_handler.cache:
cached_result = self.tools_handler.cache.read(
tool=func_name, input=input_str
)
if cached_result is not None:
result = (
str(cached_result)
if not isinstance(cached_result, str)
else cached_result
)
from_cache = True
# Emit tool started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
),
)
track_delegation_if_needed(func_name, args_dict, self.task)
# Find structured tool for hooks
structured_tool: CrewStructuredTool | None = None
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
# Before hooks
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
call_result = execute_single_native_tool_call(
tool_call,
available_functions=self._available_functions,
original_tools=self.original_tools,
structured_tools=self.tools,
tools_handler=self.tools_handler,
agent=self.agent,
task=self.task,
crew=self.crew,
)
try:
for hook in get_before_tool_call_hooks():
if hook(before_hook_context) is False:
hook_blocked = True
break
except Exception: # noqa: S110
pass
if hook_blocked:
result = f"Tool execution blocked by hook. Tool: {func_name}"
elif not from_cache and not max_usage_reached:
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
raw_result = tool_func(**args_dict)
# Cache result
if self.tools_handler and self.tools_handler.cache:
should_cache = True
if original_tool:
should_cache = original_tool.cache_function(
args_dict, raw_result
)
if should_cache:
self.tools_handler.cache.add(
tool=func_name, input=input_str, output=raw_result
)
result = (
str(raw_result)
if not isinstance(raw_result, str)
else raw_result
)
except Exception as e:
result = f"Error executing tool: {e}"
if self.task:
self.task.increment_tools_errors()
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
error=e,
),
)
elif max_usage_reached and original_tool:
result = (
f"Tool '{func_name}' has reached its usage limit of "
f"{original_tool.max_usage_count} times and cannot be used anymore."
)
# After hooks
after_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=self.agent,
task=self.task,
crew=self.crew,
tool_result=result,
)
try:
for after_hook in get_after_tool_call_hooks():
hook_result = after_hook(after_hook_context)
if hook_result is not None:
result = hook_result
after_hook_context.tool_result = result
except Exception: # noqa: S110
pass
# Emit tool finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
event_source=self,
printer=self._printer,
verbose=bool(self.agent and self.agent.verbose),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"name": func_name,
"content": result,
}
messages.append(tool_message)
if call_result.func_name:
tool_calls_made.append(call_result.func_name)
if self.agent and self.agent.verbose:
cache_info = " (from cache)" if from_cache else ""
self._printer.print(
content=f"Tool {func_name} executed with result{cache_info}: {result[:200]}...",
color="green",
)
if call_result.tool_message:
messages.append(call_result.tool_message)
# Check result_as_answer
if (
original_tool
and hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer
):
final_answer = result
if call_result.result_as_answer:
final_answer = call_result.result
if final_answer is not None:
return final_answer
@@ -660,53 +451,3 @@ class StepExecutor:
pass
return self._i18n.retrieve("planning", "step_could_not_complete")
# ------------------------------------------------------------------
# Internal: Native tool support
# ------------------------------------------------------------------
def _check_native_tool_support(self) -> bool:
"""Check if LLM supports native function calling."""
return (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and bool(self.original_tools)
)
def _setup_native_tools(self) -> None:
"""Convert tools to OpenAI schema format for native function calling."""
if self.original_tools:
self._openai_tools, self._available_functions = (
convert_tools_to_openai_schema(self.original_tools)
)
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls."""
if not response:
return False
first_item = response[0]
# OpenAI-style
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Anthropic-style (ToolUseBlock)
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Bedrock-style
if (
isinstance(first_item, dict)
and "name" in first_item
and "input" in first_item
):
return True
# Gemini-style
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
from datetime import datetime
import json
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
@@ -33,22 +32,12 @@ from crewai.events.types.observation_events import (
PlanRefinementEvent,
PlanReplanTriggeredEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.flow.flow import Flow, StateProxy, listen, or_, router, start
from crewai.flow.types import FlowMethodName
from crewai.hooks.llm_hooks import (
get_after_llm_call_hooks,
get_before_llm_call_hooks,
)
from crewai.hooks.tool_hooks import (
ToolCallHookContext,
get_after_tool_call_hooks,
get_before_tool_call_hooks,
)
from crewai.hooks.types import (
AfterLLMCallHookCallable,
AfterLLMCallHookType,
@@ -56,8 +45,10 @@ from crewai.hooks.types import (
BeforeLLMCallHookType,
)
from crewai.utilities.agent_utils import (
convert_tools_to_openai_schema,
build_tool_calls_assistant_message,
check_native_tool_support,
enforce_rpm_limit,
execute_single_native_tool_call,
extract_tool_call_info,
format_message_for_llm,
get_llm_response,
@@ -69,8 +60,9 @@ from crewai.utilities.agent_utils import (
has_reached_max_iterations,
is_context_length_exceeded,
is_inside_event_loop,
is_tool_call_list,
process_llm_response,
track_delegation_if_needed,
setup_native_tools,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.i18n import I18N, get_i18n
@@ -278,61 +270,19 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._flow_initialized = True
def _check_native_tool_support(self) -> bool:
"""Check if LLM supports native function calling.
Returns:
True if the LLM supports native function calling and tools are available.
"""
return (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and bool(self.original_tools)
)
"""Check if LLM supports native function calling."""
return check_native_tool_support(self.llm, self.original_tools)
def _setup_native_tools(self) -> None:
"""Convert tools to OpenAI schema format for native function calling."""
if self.original_tools:
self._openai_tools, self._available_functions = (
convert_tools_to_openai_schema(self.original_tools)
self._openai_tools, self._available_functions = setup_native_tools(
self.original_tools
)
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# Check for OpenAI-style tool call structure
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Check for Anthropic-style tool call structure (ToolUseBlock)
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Check for Bedrock-style tool call structure (dict with name and input keys)
if (
isinstance(first_item, dict)
and "name" in first_item
and "input" in first_item
):
return True
# Check for Gemini-style function call (Part with function_call)
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
"""Check if a response is a list of tool calls."""
return is_tool_call_list(response)
@property
def use_stop_words(self) -> bool:
@@ -1157,11 +1107,9 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
role = self.agent.role if self.agent else "Assistant"
goal = self.agent.goal if self.agent else "Complete tasks efficiently"
return f"""You are {role}. Your goal: {goal}
You are executing a specific step in a multi-step plan. Focus only on completing
the current step. Use the suggested tool if one is provided. Be concise and
provide clear results that can be used by subsequent steps."""
return self._i18n.retrieve("planning", "todo_system_prompt").format(
role=role, goal=goal,
)
@router("parallel_todos_complete")
def after_parallel_execution(
@@ -1509,254 +1457,40 @@ provide clear results that can be used by subsequent steps."""
if not self.state.pending_tool_calls:
return "native_tool_completed"
# Group all tool calls into a single assistant message
tool_calls_to_report = []
for tool_call in self.state.pending_tool_calls:
info = extract_tool_call_info(tool_call)
if not info:
continue
call_id, func_name, func_args = info
tool_calls_to_report.append(
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
)
if tool_calls_to_report:
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": tool_calls_to_report,
}
if all(
type(tc).__qualname__ == "Part" for tc in self.state.pending_tool_calls
):
assistant_message["raw_tool_call_parts"] = list(
self.state.pending_tool_calls
)
# Build and append assistant message with tool call reports
assistant_message, _reports = build_tool_calls_assistant_message(
self.state.pending_tool_calls
)
if assistant_message:
self.state.messages.append(assistant_message)
# Now execute each tool
# Execute each tool call via shared pipeline
while self.state.pending_tool_calls:
tool_call = self.state.pending_tool_calls.pop(0)
info = extract_tool_call_info(tool_call)
if not info:
continue
call_id, func_name, func_args = info
# Parse arguments
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Get agent_key for event tracking
agent_key = (
getattr(self.agent, "key", "unknown") if self.agent else "unknown"
)
# Find original tool by matching sanitized name (needed for cache_function and result_as_answer)
original_tool = None
for tool in self.original_tools or []:
if sanitize_tool_name(tool.name) == func_name:
original_tool = tool
break
# Check if tool has reached max usage count
max_usage_reached = False
if (
original_tool
and original_tool.max_usage_count is not None
and original_tool.current_usage_count >= original_tool.max_usage_count
):
max_usage_reached = True
# Check cache before executing
from_cache = False
input_str = json.dumps(args_dict) if args_dict else ""
if self.tools_handler and self.tools_handler.cache:
cached_result = self.tools_handler.cache.read(
tool=func_name, input=input_str
)
if cached_result is not None:
result = (
str(cached_result)
if not isinstance(cached_result, str)
else cached_result
)
from_cache = True
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
),
)
error_event_emitted = False
track_delegation_if_needed(func_name, args_dict, self.task)
structured_tool: CrewStructuredTool | None = None
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
call_result = execute_single_native_tool_call(
tool_call,
available_functions=self._available_functions,
original_tools=self.original_tools,
structured_tools=self.tools,
tools_handler=self.tools_handler,
agent=self.agent,
task=self.task,
crew=self.crew,
event_source=self,
printer=self._printer,
verbose=bool(self.agent and self.agent.verbose),
)
before_hooks = get_before_tool_call_hooks()
try:
for hook in before_hooks:
hook_result = hook(before_hook_context)
if hook_result is False:
hook_blocked = True
break
except Exception as hook_error:
if self.agent.verbose:
self._printer.print(
content=f"Error in before_tool_call hook: {hook_error}",
color="red",
)
if hook_blocked:
result = f"Tool execution blocked by hook. Tool: {func_name}"
elif not from_cache and not max_usage_reached:
result = "Tool not found"
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
raw_result = tool_func(**args_dict)
if call_result.tool_message:
self.state.messages.append(call_result.tool_message)
# Add to cache after successful execution (before string conversion)
if self.tools_handler and self.tools_handler.cache:
should_cache = True
if original_tool:
should_cache = original_tool.cache_function(
args_dict, raw_result
)
if should_cache:
self.tools_handler.cache.add(
tool=func_name, input=input_str, output=raw_result
)
# Convert to string for message
result = (
str(raw_result)
if not isinstance(raw_result, str)
else raw_result
)
except Exception as e:
result = f"Error executing tool: {e}"
if self.task:
self.task.increment_tools_errors()
# Emit tool usage error event
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
error=e,
),
)
error_event_emitted = True
elif max_usage_reached and original_tool:
# Return error message when max usage limit is reached
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
# Execute after_tool_call hooks (even if blocked, to allow logging/monitoring)
after_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=self.agent,
task=self.task,
crew=self.crew,
tool_result=result,
)
after_hooks = get_after_tool_call_hooks()
try:
for after_hook in after_hooks:
after_hook_result = after_hook(after_hook_context)
if after_hook_result is not None:
result = after_hook_result
after_hook_context.tool_result = result
except Exception as hook_error:
if self.agent.verbose:
self._printer.print(
content=f"Error in after_tool_call hook: {hook_error}",
color="red",
)
if not error_event_emitted:
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"name": func_name,
"content": result,
}
self.state.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
cache_info = " (from cache)" if from_cache else ""
self._printer.print(
content=f"Tool {func_name} executed with result{cache_info}: {result[:200]}...",
color="green",
)
if (
original_tool
and hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer
):
if call_result.result_as_answer:
# Set the result as the final answer
self.state.current_answer = AgentFinish(
thought="Tool result is the final answer",
output=result,
text=result,
output=call_result.result,
text=call_result.result,
)
self.state.is_finished = True
return "tool_result_is_final"
@@ -2152,17 +1886,14 @@ provide clear results that can be used by subsequent steps."""
# Build synthesis prompt
role = self.agent.role if self.agent else "Assistant"
system_prompt = (
f"You are {role}. You have completed a multi-step task. "
"Synthesize the results from all steps into a single, coherent "
"final response that directly addresses the original task. "
"Do NOT list step numbers or say 'Step 1 result'. "
"Produce a clean, polished answer as if you did it all at once."
)
user_prompt = (
f"## Original Task\n{task_description}\n\n"
f"## Results from each step\n{combined_steps}\n\n"
"Synthesize these results into a single, coherent final answer."
system_prompt = self._i18n.retrieve(
"planning", "synthesis_system_prompt"
).format(role=role)
user_prompt = self._i18n.retrieve(
"planning", "synthesis_user_prompt"
).format(
task_description=task_description,
combined_steps=combined_steps,
)
try:
@@ -2390,18 +2121,11 @@ provide clear results that can be used by subsequent steps."""
self.task.description if self.task else getattr(self, "_kickoff_input", "")
)
return f"""{original}
enhancement = self._i18n.retrieve(
"planning", "replan_enhancement_prompt"
).format(previous_context=previous_context)
IMPORTANT: Previous execution attempt did not fully succeed. Please create a revised plan
that accounts for the following context from the previous attempt:
{previous_context}
Consider:
1. What steps succeeded and can be built upon
2. What steps failed and why they might have failed
3. Alternative approaches that might work better
4. Whether dependencies need to be restructured"""
return f"{original}{enhancement}"
@router("needs_replan")
def handle_replan(self) -> Literal["has_todos", "no_todos"]:

View File

@@ -80,6 +80,10 @@
"step_executor_complete_step": "\nComplete this step and provide your result.",
"step_executor_force_final_answer": "You have used the maximum number of tool calls for this step. Based on the information gathered so far, provide your final answer now.",
"step_executor_force_final_answer_suffix": "\n\nFinal Answer: ",
"step_could_not_complete": "Step could not be completed within the iteration limit."
"step_could_not_complete": "Step could not be completed within the iteration limit.",
"todo_system_prompt": "You are {role}. Your goal: {goal}\n\nYou are executing a specific step in a multi-step plan. Focus only on completing the current step. Use the suggested tool if one is provided. Be concise and provide clear results that can be used by subsequent steps.",
"synthesis_system_prompt": "You are {role}. You have completed a multi-step task. Synthesize the results from all steps into a single, coherent final response that directly addresses the original task. Do NOT list step numbers or say 'Step 1 result'. Produce a clean, polished answer as if you did it all at once.",
"synthesis_user_prompt": "## Original Task\n{task_description}\n\n## Results from each step\n{combined_steps}\n\nSynthesize these results into a single, coherent final answer.",
"replan_enhancement_prompt": "\n\nIMPORTANT: Previous execution attempt did not fully succeed. Please create a revised plan that accounts for the following context from the previous attempt:\n\n{previous_context}\n\nConsider:\n1. What steps succeeded and can be built upon\n2. What steps failed and why they might have failed\n3. Alternative approaches that might work better\n4. Whether dependencies need to be restructured"
}
}

View File

@@ -2,6 +2,8 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from datetime import datetime
import json
import re
from typing import TYPE_CHECKING, Any, Final, Literal, TypedDict
@@ -37,6 +39,7 @@ from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.agents.tools_handler import ToolsHandler
from crewai.experimental.agent_executor import AgentExecutor
from crewai.lite_agent import LiteAgent
from crewai.llm import LLM
@@ -322,6 +325,66 @@ def enforce_rpm_limit(
request_within_rpm_limit()
def _prepare_llm_call(
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
messages: list[LLMMessage],
printer: Printer,
verbose: bool = True,
) -> list[LLMMessage]:
"""Shared pre-call logic: run before hooks and resolve messages.
Args:
executor_context: Optional executor context for hook invocation.
messages: The messages to send to the LLM.
printer: Printer instance for output.
verbose: Whether to print output.
Returns:
The resolved messages list (may come from executor_context).
Raises:
ValueError: If a before hook blocks the call.
"""
if executor_context is not None:
if not _setup_before_llm_call_hooks(executor_context, printer, verbose=verbose):
raise ValueError("LLM call blocked by before_llm_call hook")
messages = executor_context.messages
return messages
def _validate_and_finalize_llm_response(
answer: Any,
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
printer: Printer,
verbose: bool = True,
) -> str | BaseModel | Any:
"""Shared post-call logic: validate response and run after hooks.
Args:
answer: The raw LLM response.
executor_context: Optional executor context for hook invocation.
printer: Printer instance for output.
verbose: Whether to print output.
Returns:
The potentially modified response.
Raises:
ValueError: If the response is None or empty.
"""
if not answer:
if verbose:
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
return _setup_after_llm_call_hooks(
executor_context, answer, printer, verbose=verbose
)
def get_llm_response(
llm: LLM | BaseLLM,
messages: list[LLMMessage],
@@ -358,11 +421,7 @@ def get_llm_response(
Exception: If an error occurs.
ValueError: If the response is None or empty.
"""
if executor_context is not None:
if not _setup_before_llm_call_hooks(executor_context, printer, verbose=verbose):
raise ValueError("LLM call blocked by before_llm_call hook")
messages = executor_context.messages
messages = _prepare_llm_call(executor_context, messages, printer, verbose=verbose)
try:
answer = llm.call(
@@ -376,16 +435,9 @@ def get_llm_response(
)
except Exception as e:
raise e
if not answer:
if verbose:
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
return _setup_after_llm_call_hooks(
executor_context, answer, printer, verbose=verbose
return _validate_and_finalize_llm_response(
answer, executor_context, printer, verbose=verbose
)
@@ -415,6 +467,7 @@ async def aget_llm_response(
from_agent: Optional agent context for the LLM call.
response_model: Optional Pydantic model for structured outputs.
executor_context: Optional executor context for hook invocation.
verbose: Whether to print output.
Returns:
The response from the LLM as a string, Pydantic model (when response_model is provided),
@@ -424,10 +477,7 @@ async def aget_llm_response(
Exception: If an error occurs.
ValueError: If the response is None or empty.
"""
if executor_context is not None:
if not _setup_before_llm_call_hooks(executor_context, printer, verbose=verbose):
raise ValueError("LLM call blocked by before_llm_call hook")
messages = executor_context.messages
messages = _prepare_llm_call(executor_context, messages, printer, verbose=verbose)
try:
answer = await llm.acall(
@@ -441,16 +491,9 @@ async def aget_llm_response(
)
except Exception as e:
raise e
if not answer:
if verbose:
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
return _setup_after_llm_call_hooks(
executor_context, answer, printer, verbose=verbose
return _validate_and_finalize_llm_response(
answer, executor_context, printer, verbose=verbose
)
@@ -939,6 +982,385 @@ def extract_tool_call_info(
return None
def is_tool_call_list(response: list[Any]) -> bool:
"""Check if a response from the LLM is a list of tool calls.
Supports OpenAI, Anthropic, Bedrock, and Gemini formats.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# OpenAI-style
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Anthropic-style (ToolUseBlock)
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Bedrock-style
if (
isinstance(first_item, dict)
and "name" in first_item
and "input" in first_item
):
return True
# Gemini-style
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
def check_native_tool_support(llm: Any, original_tools: list[BaseTool] | None) -> bool:
"""Check if the LLM supports native function calling and tools are available.
Args:
llm: The LLM instance.
original_tools: Original BaseTool instances.
Returns:
True if native function calling is supported and tools exist.
"""
return (
hasattr(llm, "supports_function_calling")
and callable(getattr(llm, "supports_function_calling", None))
and llm.supports_function_calling()
and bool(original_tools)
)
def setup_native_tools(
original_tools: list[BaseTool],
) -> tuple[list[dict[str, Any]], dict[str, Callable[..., Any]]]:
"""Convert tools to OpenAI schema format for native function calling.
Args:
original_tools: Original BaseTool instances.
Returns:
Tuple of (openai_tools_schema, available_functions_dict).
"""
return convert_tools_to_openai_schema(original_tools)
def build_tool_calls_assistant_message(
tool_calls: list[Any],
) -> tuple[LLMMessage | None, list[dict[str, Any]]]:
"""Build an assistant message containing tool call reports.
Extracts info from each tool call, builds the standard assistant message
format, and preserves raw Gemini parts when applicable.
Args:
tool_calls: Raw tool call objects from the LLM response.
Returns:
Tuple of (assistant_message, tool_calls_to_report).
assistant_message is None if no valid tool calls found.
"""
tool_calls_to_report: list[dict[str, Any]] = []
for tool_call in tool_calls:
info = extract_tool_call_info(tool_call)
if not info:
continue
call_id, func_name, func_args = info
tool_calls_to_report.append(
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
)
if not tool_calls_to_report:
return None, []
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": tool_calls_to_report,
}
# Preserve raw parts for Gemini compatibility
if all(type(tc).__qualname__ == "Part" for tc in tool_calls):
assistant_message["raw_tool_call_parts"] = list(tool_calls)
return assistant_message, tool_calls_to_report
@dataclass
class NativeToolCallResult:
"""Result from executing a single native tool call."""
call_id: str
func_name: str
result: str
from_cache: bool = False
result_as_answer: bool = False
tool_message: LLMMessage = field(default_factory=dict) # type: ignore[assignment]
def execute_single_native_tool_call(
tool_call: Any,
*,
available_functions: dict[str, Callable[..., Any]],
original_tools: list[BaseTool],
structured_tools: list[CrewStructuredTool] | None,
tools_handler: ToolsHandler | None,
agent: Agent | None,
task: Task | None,
crew: Any | None,
event_source: Any,
printer: Printer | None = None,
verbose: bool = False,
) -> NativeToolCallResult:
"""Execute a single native tool call with full lifecycle management.
Handles: arg parsing, tool lookup, max-usage check, cache read/write,
before/after hooks, event emission, and result_as_answer detection.
Args:
tool_call: Raw tool call object from the LLM.
available_functions: Map of sanitized tool name -> callable.
original_tools: Original BaseTool list (for cache_function, result_as_answer).
structured_tools: Structured tools list (for hook context).
tools_handler: Optional handler with cache.
agent: The agent instance.
task: The current task.
crew: The crew instance.
event_source: The object to use as event emitter source.
printer: Optional printer for verbose logging.
verbose: Whether to print verbose output.
Returns:
NativeToolCallResult with all execution details.
"""
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.hooks.tool_hooks import (
ToolCallHookContext,
get_after_tool_call_hooks,
get_before_tool_call_hooks,
)
info = extract_tool_call_info(tool_call)
if not info:
return NativeToolCallResult(
call_id="", func_name="", result="Unrecognized tool call format"
)
call_id, func_name, func_args = info
# Parse arguments
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
agent_key = getattr(agent, "key", "unknown") if agent else "unknown"
# Find original tool for cache_function and result_as_answer
original_tool: BaseTool | None = None
for tool in original_tools:
if sanitize_tool_name(tool.name) == func_name:
original_tool = tool
break
# Check max usage count
max_usage_reached = False
if (
original_tool
and original_tool.max_usage_count is not None
and original_tool.current_usage_count >= original_tool.max_usage_count
):
max_usage_reached = True
# Check cache
from_cache = False
input_str = json.dumps(args_dict) if args_dict else ""
result = "Tool not found"
if tools_handler and tools_handler.cache:
cached_result = tools_handler.cache.read(tool=func_name, input=input_str)
if cached_result is not None:
result = (
str(cached_result) if not isinstance(cached_result, str) else cached_result
)
from_cache = True
# Emit tool started event
started_at = datetime.now()
crewai_event_bus.emit(
event_source,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=agent,
from_task=task,
agent_key=agent_key,
),
)
track_delegation_if_needed(func_name, args_dict, task)
# Find structured tool for hooks
structured_tool: CrewStructuredTool | None = None
for structured in structured_tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
# Before hooks
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=agent,
task=task,
crew=crew,
)
try:
for hook in get_before_tool_call_hooks():
if hook(before_hook_context) is False:
hook_blocked = True
break
except Exception: # noqa: S110
pass
error_event_emitted = False
if hook_blocked:
result = f"Tool execution blocked by hook. Tool: {func_name}"
elif not from_cache and not max_usage_reached:
if func_name in available_functions:
try:
tool_func = available_functions[func_name]
raw_result = tool_func(**args_dict)
# Cache result
if tools_handler and tools_handler.cache:
should_cache = True
if original_tool:
should_cache = original_tool.cache_function(args_dict, raw_result)
if should_cache:
tools_handler.cache.add(
tool=func_name, input=input_str, output=raw_result
)
result = (
str(raw_result) if not isinstance(raw_result, str) else raw_result
)
except Exception as e:
result = f"Error executing tool: {e}"
if task:
task.increment_tools_errors()
crewai_event_bus.emit(
event_source,
event=ToolUsageErrorEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=agent,
from_task=task,
agent_key=agent_key,
error=e,
),
)
error_event_emitted = True
elif max_usage_reached and original_tool:
result = (
f"Tool '{func_name}' has reached its usage limit of "
f"{original_tool.max_usage_count} times and cannot be used anymore."
)
# After hooks
after_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=agent,
task=task,
crew=crew,
tool_result=result,
)
try:
for after_hook in get_after_tool_call_hooks():
hook_result = after_hook(after_hook_context)
if hook_result is not None:
result = hook_result
after_hook_context.tool_result = result
except Exception: # noqa: S110
pass
# Emit tool finished event (only if error event wasn't already emitted)
if not error_event_emitted:
crewai_event_bus.emit(
event_source,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=agent,
from_task=task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Build tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"name": func_name,
"content": result,
}
if verbose and printer:
cache_info = " (from cache)" if from_cache else ""
printer.print(
content=f"Tool {func_name} executed with result{cache_info}: {result[:200]}...",
color="green",
)
# Check result_as_answer
is_result_as_answer = bool(
original_tool
and hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer
)
return NativeToolCallResult(
call_id=call_id,
func_name=func_name,
result=result,
from_cache=from_cache,
result_as_answer=is_result_as_answer,
tool_message=tool_message,
)
def _setup_before_llm_call_hooks(
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
printer: Printer,