supporting parallel tool use (#4513)

* supporting parallel tool use

* ensure we respect max_usage_count

* ensure result_as_answer, hooks, and cache parodity

* improve crew agent executor

* address test comments
This commit is contained in:
Lorenze Jay
2026-02-19 14:07:28 -08:00
committed by GitHub
parent 49aa29bb41
commit d09656664d
19 changed files with 3981 additions and 881 deletions

View File

@@ -7,6 +7,7 @@ and memory management.
from __future__ import annotations
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
from typing import TYPE_CHECKING, Any, Literal, cast
@@ -685,30 +686,138 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
AgentFinish if tool has result_as_answer=True, None otherwise.
"""
from datetime import datetime
import json
from crewai.events import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
if not tool_calls:
return None
# Only process the FIRST tool call for sequential execution with reflection
tool_call = tool_calls[0]
parsed_calls = [
parsed
for tool_call in tool_calls
if (parsed := self._parse_native_tool_call(tool_call)) is not None
]
if not parsed_calls:
return None
# Extract tool call info - handle OpenAI-style, Anthropic-style, and Gemini-style
original_tools_by_name: dict[str, Any] = {}
for tool in self.original_tools or []:
original_tools_by_name[sanitize_tool_name(tool.name)] = tool
if len(parsed_calls) > 1:
has_result_as_answer_in_batch = any(
bool(
original_tools_by_name.get(func_name)
and getattr(
original_tools_by_name.get(func_name), "result_as_answer", False
)
)
for _, func_name, _ in parsed_calls
)
has_max_usage_count_in_batch = any(
bool(
original_tools_by_name.get(func_name)
and getattr(
original_tools_by_name.get(func_name),
"max_usage_count",
None,
)
is not None
)
for _, func_name, _ in parsed_calls
)
# Preserve historical sequential behavior for result_as_answer batches.
# Also avoid threading around usage counters for max_usage_count tools.
if has_result_as_answer_in_batch or has_max_usage_count_in_batch:
logger.debug(
"Skipping parallel native execution because batch includes result_as_answer or max_usage_count tool"
)
else:
execution_plan: list[
tuple[str, str, str | dict[str, Any], Any | None]
] = []
for call_id, func_name, func_args in parsed_calls:
original_tool = original_tools_by_name.get(func_name)
execution_plan.append((call_id, func_name, func_args, original_tool))
self._append_assistant_tool_calls_message(
[
(call_id, func_name, func_args)
for call_id, func_name, func_args, _ in execution_plan
]
)
max_workers = min(8, len(execution_plan))
ordered_results: list[dict[str, Any] | None] = [None] * len(execution_plan)
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(
self._execute_single_native_tool_call,
call_id=call_id,
func_name=func_name,
func_args=func_args,
available_functions=available_functions,
original_tool=original_tool,
should_execute=True,
): idx
for idx, (
call_id,
func_name,
func_args,
original_tool,
) in enumerate(execution_plan)
}
for future in as_completed(futures):
idx = futures[future]
ordered_results[idx] = future.result()
for execution_result in ordered_results:
if not execution_result:
continue
tool_finish = self._append_tool_result_and_check_finality(
execution_result
)
if tool_finish:
return tool_finish
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.messages.append(reasoning_message)
return None
# Sequential behavior: process only first tool call, then force reflection.
call_id, func_name, func_args = parsed_calls[0]
self._append_assistant_tool_calls_message([(call_id, func_name, func_args)])
execution_result = self._execute_single_native_tool_call(
call_id=call_id,
func_name=func_name,
func_args=func_args,
available_functions=available_functions,
original_tool=original_tools_by_name.get(func_name),
should_execute=True,
)
tool_finish = self._append_tool_result_and_check_finality(execution_result)
if tool_finish:
return tool_finish
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.messages.append(reasoning_message)
return None
def _parse_native_tool_call(
self, tool_call: Any
) -> tuple[str, str, str | dict[str, Any]] | None:
if hasattr(tool_call, "function"):
# OpenAI-style: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = sanitize_tool_name(tool_call.function.name)
func_args = tool_call.function.arguments
elif hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini-style: has .function_call.name and .function_call.args
return call_id, func_name, tool_call.function.arguments
if hasattr(tool_call, "function_call") and tool_call.function_call:
call_id = f"call_{id(tool_call)}"
func_name = sanitize_tool_name(tool_call.function_call.name)
func_args = (
@@ -716,13 +825,12 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if tool_call.function_call.args
else {}
)
elif hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
return call_id, func_name, func_args
if hasattr(tool_call, "name") and hasattr(tool_call, "input"):
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = sanitize_tool_name(tool_call.name)
func_args = tool_call.input # Already a dict in Anthropic
elif isinstance(tool_call, dict):
# Support OpenAI "id", Bedrock "toolUseId", or generate one
return call_id, func_name, tool_call.input
if isinstance(tool_call, dict):
call_id = (
tool_call.get("id")
or tool_call.get("toolUseId")
@@ -733,10 +841,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
func_info.get("name", "") or tool_call.get("name", "")
)
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
else:
return None
return call_id, func_name, func_args
return None
def _append_assistant_tool_calls_message(
self,
parsed_calls: list[tuple[str, str, str | dict[str, Any]]],
) -> None:
import json
# Append assistant message with single tool call
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
@@ -751,12 +864,30 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
else json.dumps(func_args),
},
}
for call_id, func_name, func_args in parsed_calls
],
}
self.messages.append(assistant_message)
# Parse arguments for the single tool call
def _execute_single_native_tool_call(
self,
*,
call_id: str,
func_name: str,
func_args: str | dict[str, Any],
available_functions: dict[str, Callable[..., Any]],
original_tool: Any | None = None,
should_execute: bool = True,
) -> dict[str, Any]:
from datetime import datetime
import json
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
@@ -765,28 +896,26 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
else:
args_dict = func_args
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
if original_tool is None:
for tool in self.original_tools or []:
if sanitize_tool_name(tool.name) == func_name:
original_tool = tool
break
# Find original tool by matching sanitized name (needed for cache_function and result_as_answer)
original_tool = None
for tool in self.original_tools or []:
if sanitize_tool_name(tool.name) == func_name:
original_tool = tool
break
# Check if tool has reached max usage count
max_usage_reached = False
if original_tool:
if (
hasattr(original_tool, "max_usage_count")
and original_tool.max_usage_count is not None
and original_tool.current_usage_count >= original_tool.max_usage_count
):
max_usage_reached = True
if not should_execute and original_tool:
max_usage_reached = True
elif (
should_execute
and original_tool
and getattr(original_tool, "max_usage_count", None) is not None
and getattr(original_tool, "current_usage_count", 0)
>= original_tool.max_usage_count
):
max_usage_reached = True
# Check cache before executing
from_cache = False
result: str = "Tool not found"
input_str = json.dumps(args_dict) if args_dict else ""
if self.tools_handler and self.tools_handler.cache:
cached_result = self.tools_handler.cache.read(
@@ -800,7 +929,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
from_cache = True
# Emit tool usage started event
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
started_at = datetime.now()
crewai_event_bus.emit(
self,
@@ -816,14 +945,12 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
track_delegation_if_needed(func_name, args_dict, self.task)
# Find the structured tool for hook context
structured_tool: CrewStructuredTool | None = None
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
# Execute before_tool_call hooks
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
@@ -847,58 +974,44 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
color="red",
)
# If hook blocked execution, set result and skip tool execution
if hook_blocked:
result = f"Tool execution blocked by hook. Tool: {func_name}"
# Execute the tool (only if not cached, not at max usage, and not blocked by hook)
elif not from_cache and not max_usage_reached:
result = "Tool not found"
if func_name in available_functions:
try:
tool_func = available_functions[func_name]
raw_result = tool_func(**args_dict)
# Add to cache after successful execution (before string conversion)
if self.tools_handler and self.tools_handler.cache:
should_cache = True
if (
original_tool
and hasattr(original_tool, "cache_function")
and callable(original_tool.cache_function)
):
should_cache = original_tool.cache_function(
args_dict, raw_result
)
if should_cache:
self.tools_handler.cache.add(
tool=func_name, input=input_str, output=raw_result
)
# Convert to string for message
result = (
str(raw_result)
if not isinstance(raw_result, str)
else raw_result
)
except Exception as e:
result = f"Error executing tool: {e}"
if self.task:
self.task.increment_tools_errors()
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
error=e,
),
)
error_event_emitted = True
elif max_usage_reached and original_tool:
# Return error message when max usage limit is reached
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
elif not from_cache and func_name in available_functions:
try:
raw_result = available_functions[func_name](**args_dict)
if self.tools_handler and self.tools_handler.cache:
should_cache = True
if (
original_tool
and hasattr(original_tool, "cache_function")
and callable(original_tool.cache_function)
):
should_cache = original_tool.cache_function(args_dict, raw_result)
if should_cache:
self.tools_handler.cache.add(
tool=func_name, input=input_str, output=raw_result
)
result = str(raw_result) if not isinstance(raw_result, str) else raw_result
except Exception as e:
result = f"Error executing tool: {e}"
if self.task:
self.task.increment_tools_errors()
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
error=e,
),
)
error_event_emitted = True
after_hook_context = ToolCallHookContext(
tool_name=func_name,
@@ -938,7 +1051,23 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
),
)
# Append tool result message
return {
"call_id": call_id,
"func_name": func_name,
"result": result,
"from_cache": from_cache,
"original_tool": original_tool,
}
def _append_tool_result_and_check_finality(
self, execution_result: dict[str, Any]
) -> AgentFinish | None:
call_id = cast(str, execution_result["call_id"])
func_name = cast(str, execution_result["func_name"])
result = cast(str, execution_result["result"])
from_cache = cast(bool, execution_result["from_cache"])
original_tool = execution_result["original_tool"]
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
@@ -947,7 +1076,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
}
self.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
cache_info = " (from cache)" if from_cache else ""
self._printer.print(
@@ -960,20 +1088,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
and hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer
):
# Return immediately with tool result as final answer
return AgentFinish(
thought="Tool result is the final answer",
output=result,
text=result,
)
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.messages.append(reasoning_message)
return None
async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]:

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from collections.abc import Callable, Coroutine
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import json
import threading
@@ -668,9 +669,12 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
if not self.state.pending_tool_calls:
return "native_tool_completed"
pending_tool_calls = list(self.state.pending_tool_calls)
self.state.pending_tool_calls.clear()
# Group all tool calls into a single assistant message
tool_calls_to_report = []
for tool_call in self.state.pending_tool_calls:
for tool_call in pending_tool_calls:
info = extract_tool_call_info(tool_call)
if not info:
continue
@@ -695,201 +699,85 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"content": None,
"tool_calls": tool_calls_to_report,
}
if all(
type(tc).__qualname__ == "Part" for tc in self.state.pending_tool_calls
):
assistant_message["raw_tool_call_parts"] = list(
self.state.pending_tool_calls
)
if all(type(tc).__qualname__ == "Part" for tc in pending_tool_calls):
assistant_message["raw_tool_call_parts"] = list(pending_tool_calls)
self.state.messages.append(assistant_message)
# Now execute each tool
while self.state.pending_tool_calls:
tool_call = self.state.pending_tool_calls.pop(0)
info = extract_tool_call_info(tool_call)
if not info:
continue
runnable_tool_calls = [
tool_call
for tool_call in pending_tool_calls
if extract_tool_call_info(tool_call) is not None
]
should_parallelize = self._should_parallelize_native_tool_calls(
runnable_tool_calls
)
call_id, func_name, func_args = info
# Parse arguments
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Get agent_key for event tracking
agent_key = (
getattr(self.agent, "key", "unknown") if self.agent else "unknown"
)
# Find original tool by matching sanitized name (needed for cache_function and result_as_answer)
original_tool = None
for tool in self.original_tools or []:
if sanitize_tool_name(tool.name) == func_name:
original_tool = tool
break
# Check if tool has reached max usage count
max_usage_reached = False
if (
original_tool
and original_tool.max_usage_count is not None
and original_tool.current_usage_count >= original_tool.max_usage_count
):
max_usage_reached = True
# Check cache before executing
from_cache = False
input_str = json.dumps(args_dict) if args_dict else ""
if self.tools_handler and self.tools_handler.cache:
cached_result = self.tools_handler.cache.read(
tool=func_name, input=input_str
execution_results: list[dict[str, Any]] = []
if should_parallelize:
max_workers = min(8, len(runnable_tool_calls))
with ThreadPoolExecutor(max_workers=max_workers) as pool:
future_to_idx = {
pool.submit(self._execute_single_native_tool_call, tool_call): idx
for idx, tool_call in enumerate(runnable_tool_calls)
}
ordered_results: list[dict[str, Any] | None] = [None] * len(
runnable_tool_calls
)
if cached_result is not None:
result = (
str(cached_result)
if not isinstance(cached_result, str)
else cached_result
)
from_cache = True
for future in as_completed(future_to_idx):
idx = future_to_idx[future]
ordered_results[idx] = future.result()
execution_results = [
result for result in ordered_results if result is not None
]
else:
# Execute sequentially so result_as_answer tools can short-circuit
# immediately without running remaining calls.
for tool_call in runnable_tool_calls:
execution_result = self._execute_single_native_tool_call(tool_call)
call_id = cast(str, execution_result["call_id"])
func_name = cast(str, execution_result["func_name"])
result = cast(str, execution_result["result"])
from_cache = cast(bool, execution_result["from_cache"])
original_tool = execution_result["original_tool"]
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
),
)
error_event_emitted = False
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"name": func_name,
"content": result,
}
self.state.messages.append(tool_message)
track_delegation_if_needed(func_name, args_dict, self.task)
structured_tool: CrewStructuredTool | None = None
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=self.agent,
task=self.task,
crew=self.crew,
)
before_hooks = get_before_tool_call_hooks()
try:
for hook in before_hooks:
hook_result = hook(before_hook_context)
if hook_result is False:
hook_blocked = True
break
except Exception as hook_error:
if self.agent.verbose:
# Log the tool execution
if self.agent and self.agent.verbose:
cache_info = " (from cache)" if from_cache else ""
self._printer.print(
content=f"Error in before_tool_call hook: {hook_error}",
color="red",
content=f"Tool {func_name} executed with result{cache_info}: {result[:200]}...",
color="green",
)
if hook_blocked:
result = f"Tool execution blocked by hook. Tool: {func_name}"
elif not from_cache and not max_usage_reached:
result = "Tool not found"
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
raw_result = tool_func(**args_dict)
# Add to cache after successful execution (before string conversion)
if self.tools_handler and self.tools_handler.cache:
should_cache = True
if original_tool:
should_cache = original_tool.cache_function(
args_dict, raw_result
)
if should_cache:
self.tools_handler.cache.add(
tool=func_name, input=input_str, output=raw_result
)
# Convert to string for message
result = (
str(raw_result)
if not isinstance(raw_result, str)
else raw_result
)
except Exception as e:
result = f"Error executing tool: {e}"
if self.task:
self.task.increment_tools_errors()
# Emit tool usage error event
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
error=e,
),
)
error_event_emitted = True
elif max_usage_reached and original_tool:
# Return error message when max usage limit is reached
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
# Execute after_tool_call hooks (even if blocked, to allow logging/monitoring)
after_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=self.agent,
task=self.task,
crew=self.crew,
tool_result=result,
)
after_hooks = get_after_tool_call_hooks()
try:
for after_hook in after_hooks:
after_hook_result = after_hook(after_hook_context)
if after_hook_result is not None:
result = after_hook_result
after_hook_context.tool_result = result
except Exception as hook_error:
if self.agent.verbose:
self._printer.print(
content=f"Error in after_tool_call hook: {hook_error}",
color="red",
)
if not error_event_emitted:
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
if (
original_tool
and hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer
):
self.state.current_answer = AgentFinish(
thought="Tool result is the final answer",
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
text=result,
)
self.state.is_finished = True
return "tool_result_is_final"
return "native_tool_completed"
for execution_result in execution_results:
call_id = cast(str, execution_result["call_id"])
func_name = cast(str, execution_result["func_name"])
result = cast(str, execution_result["result"])
from_cache = cast(bool, execution_result["from_cache"])
original_tool = execution_result["original_tool"]
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
@@ -922,6 +810,224 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "native_tool_completed"
def _should_parallelize_native_tool_calls(self, tool_calls: list[Any]) -> bool:
"""Determine if native tool calls are safe to run in parallel."""
if len(tool_calls) <= 1:
return False
for tool_call in tool_calls:
info = extract_tool_call_info(tool_call)
if not info:
continue
_, func_name, _ = info
original_tool = None
for tool in self.original_tools or []:
if sanitize_tool_name(tool.name) == func_name:
original_tool = tool
break
if not original_tool:
continue
if getattr(original_tool, "result_as_answer", False):
return False
if getattr(original_tool, "max_usage_count", None) is not None:
return False
return True
def _execute_single_native_tool_call(self, tool_call: Any) -> dict[str, Any]:
"""Execute a single native tool call and return metadata/result."""
info = extract_tool_call_info(tool_call)
if not info:
raise ValueError("Invalid native tool call format")
call_id, func_name, func_args = info
# Parse arguments
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Get agent_key for event tracking
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
# Find original tool by matching sanitized name (needed for cache_function and result_as_answer)
original_tool = None
for tool in self.original_tools or []:
if sanitize_tool_name(tool.name) == func_name:
original_tool = tool
break
# Check if tool has reached max usage count
max_usage_reached = False
if (
original_tool
and original_tool.max_usage_count is not None
and original_tool.current_usage_count >= original_tool.max_usage_count
):
max_usage_reached = True
# Check cache before executing
from_cache = False
input_str = json.dumps(args_dict) if args_dict else ""
if self.tools_handler and self.tools_handler.cache:
cached_result = self.tools_handler.cache.read(
tool=func_name, input=input_str
)
if cached_result is not None:
result = (
str(cached_result)
if not isinstance(cached_result, str)
else cached_result
)
from_cache = True
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
),
)
error_event_emitted = False
track_delegation_if_needed(func_name, args_dict, self.task)
structured_tool: CrewStructuredTool | None = None
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=self.agent,
task=self.task,
crew=self.crew,
)
before_hooks = get_before_tool_call_hooks()
try:
for hook in before_hooks:
hook_result = hook(before_hook_context)
if hook_result is False:
hook_blocked = True
break
except Exception as hook_error:
if self.agent.verbose:
self._printer.print(
content=f"Error in before_tool_call hook: {hook_error}",
color="red",
)
if hook_blocked:
result = f"Tool execution blocked by hook. Tool: {func_name}"
elif not from_cache and not max_usage_reached:
result = "Tool not found"
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
raw_result = tool_func(**args_dict)
# Add to cache after successful execution (before string conversion)
if self.tools_handler and self.tools_handler.cache:
should_cache = True
if original_tool:
should_cache = original_tool.cache_function(
args_dict, raw_result
)
if should_cache:
self.tools_handler.cache.add(
tool=func_name, input=input_str, output=raw_result
)
# Convert to string for message
result = (
str(raw_result)
if not isinstance(raw_result, str)
else raw_result
)
except Exception as e:
result = f"Error executing tool: {e}"
if self.task:
self.task.increment_tools_errors()
# Emit tool usage error event
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
error=e,
),
)
error_event_emitted = True
elif max_usage_reached and original_tool:
# Return error message when max usage limit is reached
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
# Execute after_tool_call hooks (even if blocked, to allow logging/monitoring)
after_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=self.agent,
task=self.task,
crew=self.crew,
tool_result=result,
)
after_hooks = get_after_tool_call_hooks()
try:
for after_hook in after_hooks:
after_hook_result = after_hook(after_hook_context)
if after_hook_result is not None:
result = after_hook_result
after_hook_context.tool_result = result
except Exception as hook_error:
if self.agent.verbose:
self._printer.print(
content=f"Error in after_tool_call hook: {hook_error}",
color="red",
)
if not error_event_emitted:
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
return {
"call_id": call_id,
"func_name": func_name,
"result": result,
"from_cache": from_cache,
"original_tool": original_tool,
}
def _extract_tool_name(self, tool_call: Any) -> str:
"""Extract tool name from various tool call formats."""
if hasattr(tool_call, "function"):