Refactor AgentExecutor to support batch execution of native tool calls

- Updated the  method to process all tools from  in a single batch, enhancing efficiency and reducing the number of interactions with the LLM.
- Introduced a new utility function  to streamline the extraction of tool call details, improving compatibility with various tool formats.
- Removed the  parameter, simplifying the initialization of the .
- Enhanced logging and message handling to provide clearer insights during tool execution.
- This refactor improves the overall performance and usability of the agent execution flow.
This commit is contained in:
lorenzejay
2026-01-21 13:03:06 -08:00
parent e562a06836
commit 56dd2f82a4

View File

@@ -50,6 +50,7 @@ from crewai.utilities.agent_utils import (
is_context_length_exceeded, is_context_length_exceeded,
is_inside_event_loop, is_inside_event_loop,
process_llm_response, process_llm_response,
extract_tool_call_info,
) )
from crewai.utilities.constants import TRAINING_DATA_FILE from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.i18n import I18N, get_i18n from crewai.utilities.i18n import I18N, get_i18n
@@ -124,7 +125,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
callbacks: list[Any] | None = None, callbacks: list[Any] | None = None,
response_model: type[BaseModel] | None = None, response_model: type[BaseModel] | None = None,
i18n: I18N | None = None, i18n: I18N | None = None,
max_tools_per_turn: int = 10,
) -> None: ) -> None:
"""Initialize the flow-based agent executor. """Initialize the flow-based agent executor.
@@ -169,7 +169,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self.respect_context_window = respect_context_window self.respect_context_window = respect_context_window
self.request_within_rpm_limit = request_within_rpm_limit self.request_within_rpm_limit = request_within_rpm_limit
self.response_model = response_model self.response_model = response_model
self.max_tools_per_turn = max_tools_per_turn
self.log_error_after = 3 self.log_error_after = 3
self._console: Console = Console() self._console: Console = Console()
@@ -482,6 +481,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
if isinstance(answer, list) and answer and self._is_tool_call_list(answer): if isinstance(answer, list) and answer and self._is_tool_call_list(answer):
# Store tool calls for sequential processing # Store tool calls for sequential processing
self.state.pending_tool_calls = list(answer) self.state.pending_tool_calls = list(answer)
iteration_elapsed = time.time() - iteration_start iteration_elapsed = time.time() - iteration_start
print( print(
f"[{time.strftime('%H:%M:%S')}] -> Routing to native_tool_calls ({len(answer)} tools)" f"[{time.strftime('%H:%M:%S')}] -> Routing to native_tool_calls ({len(answer)} tools)"
@@ -608,55 +608,23 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
@listen("native_tool_calls") @listen("native_tool_calls")
def execute_native_tool(self) -> Literal["native_tool_completed"]: def execute_native_tool(self) -> Literal["native_tool_completed"]:
"""Execute a SINGLE native tool call with reflection after. """Execute native tool calls in a batch.
Processes only the first tool from pending_tool_calls, then asks Processes all tools from pending_tool_calls, executes them,
the LLM if it can answer the task. Remaining tools stay in the queue and appends results to the conversation history.
for potential execution on next iteration.
""" """
if not self.state.pending_tool_calls: if not self.state.pending_tool_calls:
return "native_tool_completed" return "native_tool_completed"
# Pop just the first tool (leave the rest in queue for potential continuation) # Group all tool calls into a single assistant message
tool_call = self.state.pending_tool_calls.pop(0) tool_calls_to_report = []
print( for tool_call in self.state.pending_tool_calls:
f"Executing 1 tool, {len(self.state.pending_tool_calls)} remaining in queue" info = extract_tool_call_info(tool_call)
) if not info:
continue
# Extract tool call info - handle OpenAI, Anthropic, and Gemini formats call_id, func_name, func_args = info
if hasattr(tool_call, "function"): tool_calls_to_report.append(
# OpenAI format: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.function.name
func_args = tool_call.function.arguments
elif hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini format: has .function_call.name and .function_call.args
call_id = f"call_{id(tool_call)}"
func_name = tool_call.function_call.name
func_args = (
dict(tool_call.function_call.args)
if tool_call.function_call.args
else {}
)
elif hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.name
func_args = tool_call.input # Already a dict in Anthropic
elif isinstance(tool_call, dict):
call_id = tool_call.get("id", f"call_{id(tool_call)}")
func_info = tool_call.get("function", {})
func_name = func_info.get("name", "") or tool_call.get("name", "")
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
else:
# Unrecognized format - skip and try next
return "native_tool_completed"
# Append assistant message with single tool call
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": [
{ {
"id": call_id, "id": call_id,
"type": "function", "type": "function",
@@ -667,90 +635,97 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
else json.dumps(func_args), else json.dumps(func_args),
}, },
} }
],
}
self.state.messages.append(assistant_message)
# Parse arguments for the single tool call
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
),
)
# Execute the tool
result = "Tool not found"
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
result = tool_func(**args_dict)
if not isinstance(result, str):
result = str(result)
except Exception as e:
result = f"Error executing tool: {e}"
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"content": result,
}
self.state.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Tool {func_name} executed with result: {result[:200]}...",
color="green",
) )
# Only add reflection prompt if there are still pending tools if tool_calls_to_report:
# If no pending tools, skip reflection - LLM will naturally continue assistant_message: LLMMessage = {
if self.state.pending_tool_calls: "role": "assistant",
print("--------------------------------") "content": None,
print( "tool_calls": tool_calls_to_report,
f"REFLECTION: {len(self.state.pending_tool_calls)} tools pending - asking LLM to decide"
)
print("--------------------------------")
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
} }
self.state.messages.append(reasoning_message) self.state.messages.append(assistant_message)
else:
print("--------------------------------") # Now execute each tool
print("SKIPPING REFLECTION: No pending tools - LLM will continue naturally") while self.state.pending_tool_calls:
print("--------------------------------") tool_call = self.state.pending_tool_calls.pop(0)
info = extract_tool_call_info(tool_call)
if not info:
continue
call_id, func_name, func_args = info
# Parse arguments
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
),
)
# Execute the tool
result = "Tool not found"
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
result = tool_func(**args_dict)
if not isinstance(result, str):
result = str(result)
except Exception as e:
result = f"Error executing tool: {e}"
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"content": result,
}
self.state.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Tool {func_name} executed with result: {result[:200]}...",
color="green",
)
# Add reflection prompt once after all tools in the batch
print("--------------------------------")
print("BATCH COMPLETED: All pending tools executed - adding reflection prompt")
print("--------------------------------")
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
return "native_tool_completed" return "native_tool_completed"