From 56dd2f82a48e6e32d09b9f16e6e96604694fa253 Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Wed, 21 Jan 2026 13:03:06 -0800 Subject: [PATCH] Refactor AgentExecutor to support batch execution of native tool calls - Updated the method to process all tools from in a single batch, enhancing efficiency and reducing the number of interactions with the LLM. - Introduced a new utility function to streamline the extraction of tool call details, improving compatibility with various tool formats. - Removed the parameter, simplifying the initialization of the . - Enhanced logging and message handling to provide clearer insights during tool execution. - This refactor improves the overall performance and usability of the agent execution flow. --- .../src/crewai/experimental/agent_executor.py | 227 ++++++++---------- 1 file changed, 101 insertions(+), 126 deletions(-) diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index ac664ff68..b8e2ad7f1 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -50,6 +50,7 @@ from crewai.utilities.agent_utils import ( is_context_length_exceeded, is_inside_event_loop, process_llm_response, + extract_tool_call_info, ) from crewai.utilities.constants import TRAINING_DATA_FILE from crewai.utilities.i18n import I18N, get_i18n @@ -124,7 +125,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): callbacks: list[Any] | None = None, response_model: type[BaseModel] | None = None, i18n: I18N | None = None, - max_tools_per_turn: int = 10, ) -> None: """Initialize the flow-based agent executor. @@ -169,7 +169,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self.respect_context_window = respect_context_window self.request_within_rpm_limit = request_within_rpm_limit self.response_model = response_model - self.max_tools_per_turn = max_tools_per_turn self.log_error_after = 3 self._console: Console = Console() @@ -482,6 +481,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): if isinstance(answer, list) and answer and self._is_tool_call_list(answer): # Store tool calls for sequential processing self.state.pending_tool_calls = list(answer) + iteration_elapsed = time.time() - iteration_start print( f"[{time.strftime('%H:%M:%S')}] -> Routing to native_tool_calls ({len(answer)} tools)" @@ -608,55 +608,23 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): @listen("native_tool_calls") def execute_native_tool(self) -> Literal["native_tool_completed"]: - """Execute a SINGLE native tool call with reflection after. + """Execute native tool calls in a batch. - Processes only the first tool from pending_tool_calls, then asks - the LLM if it can answer the task. Remaining tools stay in the queue - for potential execution on next iteration. + Processes all tools from pending_tool_calls, executes them, + and appends results to the conversation history. """ if not self.state.pending_tool_calls: return "native_tool_completed" - # Pop just the first tool (leave the rest in queue for potential continuation) - tool_call = self.state.pending_tool_calls.pop(0) - print( - f"Executing 1 tool, {len(self.state.pending_tool_calls)} remaining in queue" - ) + # Group all tool calls into a single assistant message + tool_calls_to_report = [] + for tool_call in self.state.pending_tool_calls: + info = extract_tool_call_info(tool_call) + if not info: + continue - # Extract tool call info - handle OpenAI, Anthropic, and Gemini formats - if hasattr(tool_call, "function"): - # OpenAI format: has .function.name and .function.arguments - call_id = getattr(tool_call, "id", f"call_{id(tool_call)}") - func_name = tool_call.function.name - func_args = tool_call.function.arguments - elif hasattr(tool_call, "function_call") and tool_call.function_call: - # Gemini format: has .function_call.name and .function_call.args - call_id = f"call_{id(tool_call)}" - func_name = tool_call.function_call.name - func_args = ( - dict(tool_call.function_call.args) - if tool_call.function_call.args - else {} - ) - elif hasattr(tool_call, "name") and hasattr(tool_call, "input"): - # Anthropic format: has .name and .input (ToolUseBlock) - call_id = getattr(tool_call, "id", f"call_{id(tool_call)}") - func_name = tool_call.name - func_args = tool_call.input # Already a dict in Anthropic - elif isinstance(tool_call, dict): - call_id = tool_call.get("id", f"call_{id(tool_call)}") - func_info = tool_call.get("function", {}) - func_name = func_info.get("name", "") or tool_call.get("name", "") - func_args = func_info.get("arguments", "{}") or tool_call.get("input", {}) - else: - # Unrecognized format - skip and try next - return "native_tool_completed" - - # Append assistant message with single tool call - assistant_message: LLMMessage = { - "role": "assistant", - "content": None, - "tool_calls": [ + call_id, func_name, func_args = info + tool_calls_to_report.append( { "id": call_id, "type": "function", @@ -667,90 +635,97 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): else json.dumps(func_args), }, } - ], - } - self.state.messages.append(assistant_message) - - # Parse arguments for the single tool call - if isinstance(func_args, str): - try: - args_dict = json.loads(func_args) - except json.JSONDecodeError: - args_dict = {} - else: - args_dict = func_args - - # Emit tool usage started event - started_at = datetime.now() - crewai_event_bus.emit( - self, - event=ToolUsageStartedEvent( - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - ), - ) - - # Execute the tool - result = "Tool not found" - if func_name in self._available_functions: - try: - tool_func = self._available_functions[func_name] - result = tool_func(**args_dict) - if not isinstance(result, str): - result = str(result) - except Exception as e: - result = f"Error executing tool: {e}" - - # Emit tool usage finished event - crewai_event_bus.emit( - self, - event=ToolUsageFinishedEvent( - output=result, - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - started_at=started_at, - finished_at=datetime.now(), - ), - ) - - # Append tool result message - tool_message: LLMMessage = { - "role": "tool", - "tool_call_id": call_id, - "content": result, - } - self.state.messages.append(tool_message) - - # Log the tool execution - if self.agent and self.agent.verbose: - self._printer.print( - content=f"Tool {func_name} executed with result: {result[:200]}...", - color="green", ) - # Only add reflection prompt if there are still pending tools - # If no pending tools, skip reflection - LLM will naturally continue - if self.state.pending_tool_calls: - print("--------------------------------") - print( - f"REFLECTION: {len(self.state.pending_tool_calls)} tools pending - asking LLM to decide" - ) - print("--------------------------------") - reasoning_prompt = self._i18n.slice("post_tool_reasoning") - - reasoning_message: LLMMessage = { - "role": "user", - "content": reasoning_prompt, + if tool_calls_to_report: + assistant_message: LLMMessage = { + "role": "assistant", + "content": None, + "tool_calls": tool_calls_to_report, } - self.state.messages.append(reasoning_message) - else: - print("--------------------------------") - print("SKIPPING REFLECTION: No pending tools - LLM will continue naturally") - print("--------------------------------") + self.state.messages.append(assistant_message) + + # Now execute each tool + while self.state.pending_tool_calls: + tool_call = self.state.pending_tool_calls.pop(0) + info = extract_tool_call_info(tool_call) + if not info: + continue + + call_id, func_name, func_args = info + + # Parse arguments + if isinstance(func_args, str): + try: + args_dict = json.loads(func_args) + except json.JSONDecodeError: + args_dict = {} + else: + args_dict = func_args + + # Emit tool usage started event + started_at = datetime.now() + crewai_event_bus.emit( + self, + event=ToolUsageStartedEvent( + tool_name=func_name, + tool_args=args_dict, + from_agent=self.agent, + from_task=self.task, + ), + ) + + # Execute the tool + result = "Tool not found" + if func_name in self._available_functions: + try: + tool_func = self._available_functions[func_name] + result = tool_func(**args_dict) + if not isinstance(result, str): + result = str(result) + except Exception as e: + result = f"Error executing tool: {e}" + + # Emit tool usage finished event + crewai_event_bus.emit( + self, + event=ToolUsageFinishedEvent( + output=result, + tool_name=func_name, + tool_args=args_dict, + from_agent=self.agent, + from_task=self.task, + started_at=started_at, + finished_at=datetime.now(), + ), + ) + + # Append tool result message + tool_message: LLMMessage = { + "role": "tool", + "tool_call_id": call_id, + "content": result, + } + self.state.messages.append(tool_message) + + # Log the tool execution + if self.agent and self.agent.verbose: + self._printer.print( + content=f"Tool {func_name} executed with result: {result[:200]}...", + color="green", + ) + + # Add reflection prompt once after all tools in the batch + print("--------------------------------") + print("BATCH COMPLETED: All pending tools executed - adding reflection prompt") + print("--------------------------------") + reasoning_prompt = self._i18n.slice("post_tool_reasoning") + + reasoning_message: LLMMessage = { + "role": "user", + "content": reasoning_prompt, + } + self.state.messages.append(reasoning_message) return "native_tool_completed"