diff --git a/lib/crewai/src/crewai/agents/step_executor.py b/lib/crewai/src/crewai/agents/step_executor.py index e8b0a2f1a..d1d6564d6 100644 --- a/lib/crewai/src/crewai/agents/step_executor.py +++ b/lib/crewai/src/crewai/agents/step_executor.py @@ -8,8 +8,6 @@ here — moved from AgentExecutor so the outer loop stays clean. from __future__ import annotations from collections.abc import Callable -from datetime import datetime -import json import time from typing import TYPE_CHECKING, Any @@ -19,24 +17,15 @@ from crewai.agents.parser import ( AgentAction, AgentFinish, ) -from crewai.events.event_bus import crewai_event_bus -from crewai.events.types.tool_usage_events import ( - ToolUsageErrorEvent, - ToolUsageFinishedEvent, - ToolUsageStartedEvent, -) -from crewai.hooks.tool_hooks import ( - ToolCallHookContext, - get_after_tool_call_hooks, - get_before_tool_call_hooks, -) from crewai.utilities.agent_utils import ( - convert_tools_to_openai_schema, + build_tool_calls_assistant_message, + check_native_tool_support, enforce_rpm_limit, - extract_tool_call_info, + execute_single_native_tool_call, format_message_for_llm, + is_tool_call_list, process_llm_response, - track_delegation_if_needed, + setup_native_tools, ) from crewai.utilities.i18n import I18N, get_i18n from crewai.utilities.planning_types import TodoItem @@ -115,11 +104,13 @@ class StepExecutor: self._printer: Printer = Printer() # Native tool support — set up once - self._use_native_tools = self._check_native_tool_support() + self._use_native_tools = check_native_tool_support(self.llm, self.original_tools) self._openai_tools: list[dict[str, Any]] = [] self._available_functions: dict[str, Callable[..., Any]] = {} - if self._use_native_tools: - self._setup_native_tools() + if self._use_native_tools and self.original_tools: + self._openai_tools, self._available_functions = setup_native_tools( + self.original_tools + ) # ------------------------------------------------------------------ # Public API @@ -372,7 +363,7 @@ class StepExecutor: raise ValueError("Empty response from LLM") # Check if the response is a list of tool calls - if isinstance(answer, list) and answer and self._is_tool_call_list(answer): + if isinstance(answer, list) and answer and is_tool_call_list(answer): return self._execute_native_tool_calls(answer, messages, tool_calls_made) # Text response — this is the final answer @@ -395,236 +386,36 @@ class StepExecutor: Returns final answer string if a tool has result_as_answer, else None. """ - # Build assistant message with tool calls - tool_calls_to_report: list[dict[str, Any]] = [] - for tool_call in tool_calls: - info = extract_tool_call_info(tool_call) - if not info: - continue - call_id, func_name, func_args = info - tool_calls_to_report.append( - { - "id": call_id, - "type": "function", - "function": { - "name": func_name, - "arguments": func_args - if isinstance(func_args, str) - else json.dumps(func_args), - }, - } - ) - - if tool_calls_to_report: - assistant_message: LLMMessage = { - "role": "assistant", - "content": None, - "tool_calls": tool_calls_to_report, - } - # Preserve raw parts for Gemini compatibility - if all(type(tc).__qualname__ == "Part" for tc in tool_calls): - assistant_message["raw_tool_call_parts"] = list(tool_calls) + # Build and append assistant message with tool call reports + assistant_message, _reports = build_tool_calls_assistant_message(tool_calls) + if assistant_message: messages.append(assistant_message) - # Execute each tool call + # Execute each tool call via shared pipeline final_answer: str | None = None for tool_call in tool_calls: - info = extract_tool_call_info(tool_call) - if not info: - continue - - call_id, func_name, func_args = info - tool_calls_made.append(func_name) - - # Parse arguments - if isinstance(func_args, str): - try: - args_dict = json.loads(func_args) - except json.JSONDecodeError: - args_dict = {} - else: - args_dict = func_args - - agent_key = ( - getattr(self.agent, "key", "unknown") if self.agent else "unknown" - ) - - # Find original tool for cache_function and result_as_answer - original_tool = None - for tool in self.original_tools: - if sanitize_tool_name(tool.name) == func_name: - original_tool = tool - break - - # Check max usage count - max_usage_reached = False - if ( - original_tool - and original_tool.max_usage_count is not None - and original_tool.current_usage_count >= original_tool.max_usage_count - ): - max_usage_reached = True - - # Check cache - from_cache = False - input_str = json.dumps(args_dict) if args_dict else "" - result = "Tool not found" - - if self.tools_handler and self.tools_handler.cache: - cached_result = self.tools_handler.cache.read( - tool=func_name, input=input_str - ) - if cached_result is not None: - result = ( - str(cached_result) - if not isinstance(cached_result, str) - else cached_result - ) - from_cache = True - - # Emit tool started event - started_at = datetime.now() - crewai_event_bus.emit( - self, - event=ToolUsageStartedEvent( - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - agent_key=agent_key, - ), - ) - - track_delegation_if_needed(func_name, args_dict, self.task) - - # Find structured tool for hooks - structured_tool: CrewStructuredTool | None = None - for structured in self.tools or []: - if sanitize_tool_name(structured.name) == func_name: - structured_tool = structured - break - - # Before hooks - hook_blocked = False - before_hook_context = ToolCallHookContext( - tool_name=func_name, - tool_input=args_dict, - tool=structured_tool, # type: ignore[arg-type] + call_result = execute_single_native_tool_call( + tool_call, + available_functions=self._available_functions, + original_tools=self.original_tools, + structured_tools=self.tools, + tools_handler=self.tools_handler, agent=self.agent, task=self.task, crew=self.crew, - ) - try: - for hook in get_before_tool_call_hooks(): - if hook(before_hook_context) is False: - hook_blocked = True - break - except Exception: # noqa: S110 - pass - - if hook_blocked: - result = f"Tool execution blocked by hook. Tool: {func_name}" - elif not from_cache and not max_usage_reached: - if func_name in self._available_functions: - try: - tool_func = self._available_functions[func_name] - raw_result = tool_func(**args_dict) - - # Cache result - if self.tools_handler and self.tools_handler.cache: - should_cache = True - if original_tool: - should_cache = original_tool.cache_function( - args_dict, raw_result - ) - if should_cache: - self.tools_handler.cache.add( - tool=func_name, input=input_str, output=raw_result - ) - - result = ( - str(raw_result) - if not isinstance(raw_result, str) - else raw_result - ) - except Exception as e: - result = f"Error executing tool: {e}" - if self.task: - self.task.increment_tools_errors() - crewai_event_bus.emit( - self, - event=ToolUsageErrorEvent( - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - agent_key=agent_key, - error=e, - ), - ) - elif max_usage_reached and original_tool: - result = ( - f"Tool '{func_name}' has reached its usage limit of " - f"{original_tool.max_usage_count} times and cannot be used anymore." - ) - - # After hooks - after_hook_context = ToolCallHookContext( - tool_name=func_name, - tool_input=args_dict, - tool=structured_tool, # type: ignore[arg-type] - agent=self.agent, - task=self.task, - crew=self.crew, - tool_result=result, - ) - try: - for after_hook in get_after_tool_call_hooks(): - hook_result = after_hook(after_hook_context) - if hook_result is not None: - result = hook_result - after_hook_context.tool_result = result - except Exception: # noqa: S110 - pass - - # Emit tool finished event - crewai_event_bus.emit( - self, - event=ToolUsageFinishedEvent( - output=result, - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - agent_key=agent_key, - started_at=started_at, - finished_at=datetime.now(), - ), + event_source=self, + printer=self._printer, + verbose=bool(self.agent and self.agent.verbose), ) - # Append tool result message - tool_message: LLMMessage = { - "role": "tool", - "tool_call_id": call_id, - "name": func_name, - "content": result, - } - messages.append(tool_message) + if call_result.func_name: + tool_calls_made.append(call_result.func_name) - if self.agent and self.agent.verbose: - cache_info = " (from cache)" if from_cache else "" - self._printer.print( - content=f"Tool {func_name} executed with result{cache_info}: {result[:200]}...", - color="green", - ) + if call_result.tool_message: + messages.append(call_result.tool_message) - # Check result_as_answer - if ( - original_tool - and hasattr(original_tool, "result_as_answer") - and original_tool.result_as_answer - ): - final_answer = result + if call_result.result_as_answer: + final_answer = call_result.result if final_answer is not None: return final_answer @@ -660,53 +451,3 @@ class StepExecutor: pass return self._i18n.retrieve("planning", "step_could_not_complete") - - # ------------------------------------------------------------------ - # Internal: Native tool support - # ------------------------------------------------------------------ - - def _check_native_tool_support(self) -> bool: - """Check if LLM supports native function calling.""" - return ( - hasattr(self.llm, "supports_function_calling") - and callable(getattr(self.llm, "supports_function_calling", None)) - and self.llm.supports_function_calling() - and bool(self.original_tools) - ) - - def _setup_native_tools(self) -> None: - """Convert tools to OpenAI schema format for native function calling.""" - if self.original_tools: - self._openai_tools, self._available_functions = ( - convert_tools_to_openai_schema(self.original_tools) - ) - - def _is_tool_call_list(self, response: list[Any]) -> bool: - """Check if a response is a list of tool calls.""" - if not response: - return False - first_item = response[0] - # OpenAI-style - if hasattr(first_item, "function") or ( - isinstance(first_item, dict) and "function" in first_item - ): - return True - # Anthropic-style (ToolUseBlock) - if ( - hasattr(first_item, "type") - and getattr(first_item, "type", None) == "tool_use" - ): - return True - if hasattr(first_item, "name") and hasattr(first_item, "input"): - return True - # Bedrock-style - if ( - isinstance(first_item, dict) - and "name" in first_item - and "input" in first_item - ): - return True - # Gemini-style - if hasattr(first_item, "function_call") and first_item.function_call: - return True - return False diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 4be4f24c3..722a8aa39 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -2,7 +2,6 @@ from __future__ import annotations import asyncio from collections.abc import Callable, Coroutine -from datetime import datetime import json import threading from typing import TYPE_CHECKING, Any, Literal, cast @@ -33,22 +32,12 @@ from crewai.events.types.observation_events import ( PlanRefinementEvent, PlanReplanTriggeredEvent, ) -from crewai.events.types.tool_usage_events import ( - ToolUsageErrorEvent, - ToolUsageFinishedEvent, - ToolUsageStartedEvent, -) from crewai.flow.flow import Flow, StateProxy, listen, or_, router, start from crewai.flow.types import FlowMethodName from crewai.hooks.llm_hooks import ( get_after_llm_call_hooks, get_before_llm_call_hooks, ) -from crewai.hooks.tool_hooks import ( - ToolCallHookContext, - get_after_tool_call_hooks, - get_before_tool_call_hooks, -) from crewai.hooks.types import ( AfterLLMCallHookCallable, AfterLLMCallHookType, @@ -56,8 +45,10 @@ from crewai.hooks.types import ( BeforeLLMCallHookType, ) from crewai.utilities.agent_utils import ( - convert_tools_to_openai_schema, + build_tool_calls_assistant_message, + check_native_tool_support, enforce_rpm_limit, + execute_single_native_tool_call, extract_tool_call_info, format_message_for_llm, get_llm_response, @@ -69,8 +60,9 @@ from crewai.utilities.agent_utils import ( has_reached_max_iterations, is_context_length_exceeded, is_inside_event_loop, + is_tool_call_list, process_llm_response, - track_delegation_if_needed, + setup_native_tools, ) from crewai.utilities.constants import TRAINING_DATA_FILE from crewai.utilities.i18n import I18N, get_i18n @@ -278,61 +270,19 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self._flow_initialized = True def _check_native_tool_support(self) -> bool: - """Check if LLM supports native function calling. - - Returns: - True if the LLM supports native function calling and tools are available. - """ - return ( - hasattr(self.llm, "supports_function_calling") - and callable(getattr(self.llm, "supports_function_calling", None)) - and self.llm.supports_function_calling() - and bool(self.original_tools) - ) + """Check if LLM supports native function calling.""" + return check_native_tool_support(self.llm, self.original_tools) def _setup_native_tools(self) -> None: """Convert tools to OpenAI schema format for native function calling.""" if self.original_tools: - self._openai_tools, self._available_functions = ( - convert_tools_to_openai_schema(self.original_tools) + self._openai_tools, self._available_functions = setup_native_tools( + self.original_tools ) def _is_tool_call_list(self, response: list[Any]) -> bool: - """Check if a response is a list of tool calls. - - Args: - response: The response to check. - - Returns: - True if the response appears to be a list of tool calls. - """ - if not response: - return False - first_item = response[0] - # Check for OpenAI-style tool call structure - if hasattr(first_item, "function") or ( - isinstance(first_item, dict) and "function" in first_item - ): - return True - # Check for Anthropic-style tool call structure (ToolUseBlock) - if ( - hasattr(first_item, "type") - and getattr(first_item, "type", None) == "tool_use" - ): - return True - if hasattr(first_item, "name") and hasattr(first_item, "input"): - return True - # Check for Bedrock-style tool call structure (dict with name and input keys) - if ( - isinstance(first_item, dict) - and "name" in first_item - and "input" in first_item - ): - return True - # Check for Gemini-style function call (Part with function_call) - if hasattr(first_item, "function_call") and first_item.function_call: - return True - return False + """Check if a response is a list of tool calls.""" + return is_tool_call_list(response) @property def use_stop_words(self) -> bool: @@ -1157,11 +1107,9 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): role = self.agent.role if self.agent else "Assistant" goal = self.agent.goal if self.agent else "Complete tasks efficiently" - return f"""You are {role}. Your goal: {goal} - -You are executing a specific step in a multi-step plan. Focus only on completing -the current step. Use the suggested tool if one is provided. Be concise and -provide clear results that can be used by subsequent steps.""" + return self._i18n.retrieve("planning", "todo_system_prompt").format( + role=role, goal=goal, + ) @router("parallel_todos_complete") def after_parallel_execution( @@ -1509,254 +1457,40 @@ provide clear results that can be used by subsequent steps.""" if not self.state.pending_tool_calls: return "native_tool_completed" - # Group all tool calls into a single assistant message - tool_calls_to_report = [] - for tool_call in self.state.pending_tool_calls: - info = extract_tool_call_info(tool_call) - if not info: - continue - - call_id, func_name, func_args = info - tool_calls_to_report.append( - { - "id": call_id, - "type": "function", - "function": { - "name": func_name, - "arguments": func_args - if isinstance(func_args, str) - else json.dumps(func_args), - }, - } - ) - - if tool_calls_to_report: - assistant_message: LLMMessage = { - "role": "assistant", - "content": None, - "tool_calls": tool_calls_to_report, - } - if all( - type(tc).__qualname__ == "Part" for tc in self.state.pending_tool_calls - ): - assistant_message["raw_tool_call_parts"] = list( - self.state.pending_tool_calls - ) + # Build and append assistant message with tool call reports + assistant_message, _reports = build_tool_calls_assistant_message( + self.state.pending_tool_calls + ) + if assistant_message: self.state.messages.append(assistant_message) - # Now execute each tool + # Execute each tool call via shared pipeline while self.state.pending_tool_calls: tool_call = self.state.pending_tool_calls.pop(0) - info = extract_tool_call_info(tool_call) - if not info: - continue - call_id, func_name, func_args = info - - # Parse arguments - if isinstance(func_args, str): - try: - args_dict = json.loads(func_args) - except json.JSONDecodeError: - args_dict = {} - else: - args_dict = func_args - - # Get agent_key for event tracking - agent_key = ( - getattr(self.agent, "key", "unknown") if self.agent else "unknown" - ) - - # Find original tool by matching sanitized name (needed for cache_function and result_as_answer) - original_tool = None - for tool in self.original_tools or []: - if sanitize_tool_name(tool.name) == func_name: - original_tool = tool - break - - # Check if tool has reached max usage count - max_usage_reached = False - if ( - original_tool - and original_tool.max_usage_count is not None - and original_tool.current_usage_count >= original_tool.max_usage_count - ): - max_usage_reached = True - - # Check cache before executing - from_cache = False - input_str = json.dumps(args_dict) if args_dict else "" - if self.tools_handler and self.tools_handler.cache: - cached_result = self.tools_handler.cache.read( - tool=func_name, input=input_str - ) - if cached_result is not None: - result = ( - str(cached_result) - if not isinstance(cached_result, str) - else cached_result - ) - from_cache = True - - # Emit tool usage started event - started_at = datetime.now() - crewai_event_bus.emit( - self, - event=ToolUsageStartedEvent( - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - agent_key=agent_key, - ), - ) - error_event_emitted = False - - track_delegation_if_needed(func_name, args_dict, self.task) - - structured_tool: CrewStructuredTool | None = None - for structured in self.tools or []: - if sanitize_tool_name(structured.name) == func_name: - structured_tool = structured - break - - hook_blocked = False - before_hook_context = ToolCallHookContext( - tool_name=func_name, - tool_input=args_dict, - tool=structured_tool, # type: ignore[arg-type] + call_result = execute_single_native_tool_call( + tool_call, + available_functions=self._available_functions, + original_tools=self.original_tools, + structured_tools=self.tools, + tools_handler=self.tools_handler, agent=self.agent, task=self.task, crew=self.crew, + event_source=self, + printer=self._printer, + verbose=bool(self.agent and self.agent.verbose), ) - before_hooks = get_before_tool_call_hooks() - try: - for hook in before_hooks: - hook_result = hook(before_hook_context) - if hook_result is False: - hook_blocked = True - break - except Exception as hook_error: - if self.agent.verbose: - self._printer.print( - content=f"Error in before_tool_call hook: {hook_error}", - color="red", - ) - if hook_blocked: - result = f"Tool execution blocked by hook. Tool: {func_name}" - elif not from_cache and not max_usage_reached: - result = "Tool not found" - if func_name in self._available_functions: - try: - tool_func = self._available_functions[func_name] - raw_result = tool_func(**args_dict) + if call_result.tool_message: + self.state.messages.append(call_result.tool_message) - # Add to cache after successful execution (before string conversion) - if self.tools_handler and self.tools_handler.cache: - should_cache = True - if original_tool: - should_cache = original_tool.cache_function( - args_dict, raw_result - ) - if should_cache: - self.tools_handler.cache.add( - tool=func_name, input=input_str, output=raw_result - ) - - # Convert to string for message - result = ( - str(raw_result) - if not isinstance(raw_result, str) - else raw_result - ) - except Exception as e: - result = f"Error executing tool: {e}" - if self.task: - self.task.increment_tools_errors() - # Emit tool usage error event - crewai_event_bus.emit( - self, - event=ToolUsageErrorEvent( - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - agent_key=agent_key, - error=e, - ), - ) - error_event_emitted = True - elif max_usage_reached and original_tool: - # Return error message when max usage limit is reached - result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore." - - # Execute after_tool_call hooks (even if blocked, to allow logging/monitoring) - after_hook_context = ToolCallHookContext( - tool_name=func_name, - tool_input=args_dict, - tool=structured_tool, # type: ignore[arg-type] - agent=self.agent, - task=self.task, - crew=self.crew, - tool_result=result, - ) - after_hooks = get_after_tool_call_hooks() - try: - for after_hook in after_hooks: - after_hook_result = after_hook(after_hook_context) - if after_hook_result is not None: - result = after_hook_result - after_hook_context.tool_result = result - except Exception as hook_error: - if self.agent.verbose: - self._printer.print( - content=f"Error in after_tool_call hook: {hook_error}", - color="red", - ) - - if not error_event_emitted: - crewai_event_bus.emit( - self, - event=ToolUsageFinishedEvent( - output=result, - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - agent_key=agent_key, - started_at=started_at, - finished_at=datetime.now(), - ), - ) - - # Append tool result message - tool_message: LLMMessage = { - "role": "tool", - "tool_call_id": call_id, - "name": func_name, - "content": result, - } - self.state.messages.append(tool_message) - - # Log the tool execution - if self.agent and self.agent.verbose: - cache_info = " (from cache)" if from_cache else "" - self._printer.print( - content=f"Tool {func_name} executed with result{cache_info}: {result[:200]}...", - color="green", - ) - - if ( - original_tool - and hasattr(original_tool, "result_as_answer") - and original_tool.result_as_answer - ): + if call_result.result_as_answer: # Set the result as the final answer self.state.current_answer = AgentFinish( thought="Tool result is the final answer", - output=result, - text=result, + output=call_result.result, + text=call_result.result, ) self.state.is_finished = True return "tool_result_is_final" @@ -2152,17 +1886,14 @@ provide clear results that can be used by subsequent steps.""" # Build synthesis prompt role = self.agent.role if self.agent else "Assistant" - system_prompt = ( - f"You are {role}. You have completed a multi-step task. " - "Synthesize the results from all steps into a single, coherent " - "final response that directly addresses the original task. " - "Do NOT list step numbers or say 'Step 1 result'. " - "Produce a clean, polished answer as if you did it all at once." - ) - user_prompt = ( - f"## Original Task\n{task_description}\n\n" - f"## Results from each step\n{combined_steps}\n\n" - "Synthesize these results into a single, coherent final answer." + system_prompt = self._i18n.retrieve( + "planning", "synthesis_system_prompt" + ).format(role=role) + user_prompt = self._i18n.retrieve( + "planning", "synthesis_user_prompt" + ).format( + task_description=task_description, + combined_steps=combined_steps, ) try: @@ -2390,18 +2121,11 @@ provide clear results that can be used by subsequent steps.""" self.task.description if self.task else getattr(self, "_kickoff_input", "") ) - return f"""{original} + enhancement = self._i18n.retrieve( + "planning", "replan_enhancement_prompt" + ).format(previous_context=previous_context) -IMPORTANT: Previous execution attempt did not fully succeed. Please create a revised plan -that accounts for the following context from the previous attempt: - -{previous_context} - -Consider: -1. What steps succeeded and can be built upon -2. What steps failed and why they might have failed -3. Alternative approaches that might work better -4. Whether dependencies need to be restructured""" + return f"{original}{enhancement}" @router("needs_replan") def handle_replan(self) -> Literal["has_todos", "no_todos"]: diff --git a/lib/crewai/src/crewai/translations/en.json b/lib/crewai/src/crewai/translations/en.json index 25f7c08a7..a68080521 100644 --- a/lib/crewai/src/crewai/translations/en.json +++ b/lib/crewai/src/crewai/translations/en.json @@ -80,6 +80,10 @@ "step_executor_complete_step": "\nComplete this step and provide your result.", "step_executor_force_final_answer": "You have used the maximum number of tool calls for this step. Based on the information gathered so far, provide your final answer now.", "step_executor_force_final_answer_suffix": "\n\nFinal Answer: ", - "step_could_not_complete": "Step could not be completed within the iteration limit." + "step_could_not_complete": "Step could not be completed within the iteration limit.", + "todo_system_prompt": "You are {role}. Your goal: {goal}\n\nYou are executing a specific step in a multi-step plan. Focus only on completing the current step. Use the suggested tool if one is provided. Be concise and provide clear results that can be used by subsequent steps.", + "synthesis_system_prompt": "You are {role}. You have completed a multi-step task. Synthesize the results from all steps into a single, coherent final response that directly addresses the original task. Do NOT list step numbers or say 'Step 1 result'. Produce a clean, polished answer as if you did it all at once.", + "synthesis_user_prompt": "## Original Task\n{task_description}\n\n## Results from each step\n{combined_steps}\n\nSynthesize these results into a single, coherent final answer.", + "replan_enhancement_prompt": "\n\nIMPORTANT: Previous execution attempt did not fully succeed. Please create a revised plan that accounts for the following context from the previous attempt:\n\n{previous_context}\n\nConsider:\n1. What steps succeeded and can be built upon\n2. What steps failed and why they might have failed\n3. Alternative approaches that might work better\n4. Whether dependencies need to be restructured" } } diff --git a/lib/crewai/src/crewai/utilities/agent_utils.py b/lib/crewai/src/crewai/utilities/agent_utils.py index 4ded93505..149ddf00e 100644 --- a/lib/crewai/src/crewai/utilities/agent_utils.py +++ b/lib/crewai/src/crewai/utilities/agent_utils.py @@ -2,6 +2,8 @@ from __future__ import annotations import asyncio from collections.abc import Callable, Sequence +from dataclasses import dataclass, field +from datetime import datetime import json import re from typing import TYPE_CHECKING, Any, Final, Literal, TypedDict @@ -37,6 +39,7 @@ from crewai.utilities.types import LLMMessage if TYPE_CHECKING: from crewai.agent import Agent from crewai.agents.crew_agent_executor import CrewAgentExecutor + from crewai.agents.tools_handler import ToolsHandler from crewai.experimental.agent_executor import AgentExecutor from crewai.lite_agent import LiteAgent from crewai.llm import LLM @@ -322,6 +325,66 @@ def enforce_rpm_limit( request_within_rpm_limit() +def _prepare_llm_call( + executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None, + messages: list[LLMMessage], + printer: Printer, + verbose: bool = True, +) -> list[LLMMessage]: + """Shared pre-call logic: run before hooks and resolve messages. + + Args: + executor_context: Optional executor context for hook invocation. + messages: The messages to send to the LLM. + printer: Printer instance for output. + verbose: Whether to print output. + + Returns: + The resolved messages list (may come from executor_context). + + Raises: + ValueError: If a before hook blocks the call. + """ + if executor_context is not None: + if not _setup_before_llm_call_hooks(executor_context, printer, verbose=verbose): + raise ValueError("LLM call blocked by before_llm_call hook") + messages = executor_context.messages + return messages + + +def _validate_and_finalize_llm_response( + answer: Any, + executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None, + printer: Printer, + verbose: bool = True, +) -> str | BaseModel | Any: + """Shared post-call logic: validate response and run after hooks. + + Args: + answer: The raw LLM response. + executor_context: Optional executor context for hook invocation. + printer: Printer instance for output. + verbose: Whether to print output. + + Returns: + The potentially modified response. + + Raises: + ValueError: If the response is None or empty. + """ + if not answer: + if verbose: + printer.print( + content="Received None or empty response from LLM call.", + color="red", + ) + raise ValueError("Invalid response from LLM call - None or empty.") + + return _setup_after_llm_call_hooks( + executor_context, answer, printer, verbose=verbose + ) + + def get_llm_response( llm: LLM | BaseLLM, messages: list[LLMMessage], @@ -358,11 +421,7 @@ def get_llm_response( Exception: If an error occurs. ValueError: If the response is None or empty. """ - - if executor_context is not None: - if not _setup_before_llm_call_hooks(executor_context, printer, verbose=verbose): - raise ValueError("LLM call blocked by before_llm_call hook") - messages = executor_context.messages + messages = _prepare_llm_call(executor_context, messages, printer, verbose=verbose) try: answer = llm.call( @@ -376,16 +435,9 @@ def get_llm_response( ) except Exception as e: raise e - if not answer: - if verbose: - printer.print( - content="Received None or empty response from LLM call.", - color="red", - ) - raise ValueError("Invalid response from LLM call - None or empty.") - return _setup_after_llm_call_hooks( - executor_context, answer, printer, verbose=verbose + return _validate_and_finalize_llm_response( + answer, executor_context, printer, verbose=verbose ) @@ -415,6 +467,7 @@ async def aget_llm_response( from_agent: Optional agent context for the LLM call. response_model: Optional Pydantic model for structured outputs. executor_context: Optional executor context for hook invocation. + verbose: Whether to print output. Returns: The response from the LLM as a string, Pydantic model (when response_model is provided), @@ -424,10 +477,7 @@ async def aget_llm_response( Exception: If an error occurs. ValueError: If the response is None or empty. """ - if executor_context is not None: - if not _setup_before_llm_call_hooks(executor_context, printer, verbose=verbose): - raise ValueError("LLM call blocked by before_llm_call hook") - messages = executor_context.messages + messages = _prepare_llm_call(executor_context, messages, printer, verbose=verbose) try: answer = await llm.acall( @@ -441,16 +491,9 @@ async def aget_llm_response( ) except Exception as e: raise e - if not answer: - if verbose: - printer.print( - content="Received None or empty response from LLM call.", - color="red", - ) - raise ValueError("Invalid response from LLM call - None or empty.") - return _setup_after_llm_call_hooks( - executor_context, answer, printer, verbose=verbose + return _validate_and_finalize_llm_response( + answer, executor_context, printer, verbose=verbose ) @@ -939,6 +982,385 @@ def extract_tool_call_info( return None +def is_tool_call_list(response: list[Any]) -> bool: + """Check if a response from the LLM is a list of tool calls. + + Supports OpenAI, Anthropic, Bedrock, and Gemini formats. + + Args: + response: The response to check. + + Returns: + True if the response appears to be a list of tool calls. + """ + if not response: + return False + first_item = response[0] + # OpenAI-style + if hasattr(first_item, "function") or ( + isinstance(first_item, dict) and "function" in first_item + ): + return True + # Anthropic-style (ToolUseBlock) + if ( + hasattr(first_item, "type") + and getattr(first_item, "type", None) == "tool_use" + ): + return True + if hasattr(first_item, "name") and hasattr(first_item, "input"): + return True + # Bedrock-style + if ( + isinstance(first_item, dict) + and "name" in first_item + and "input" in first_item + ): + return True + # Gemini-style + if hasattr(first_item, "function_call") and first_item.function_call: + return True + return False + + +def check_native_tool_support(llm: Any, original_tools: list[BaseTool] | None) -> bool: + """Check if the LLM supports native function calling and tools are available. + + Args: + llm: The LLM instance. + original_tools: Original BaseTool instances. + + Returns: + True if native function calling is supported and tools exist. + """ + return ( + hasattr(llm, "supports_function_calling") + and callable(getattr(llm, "supports_function_calling", None)) + and llm.supports_function_calling() + and bool(original_tools) + ) + + +def setup_native_tools( + original_tools: list[BaseTool], +) -> tuple[list[dict[str, Any]], dict[str, Callable[..., Any]]]: + """Convert tools to OpenAI schema format for native function calling. + + Args: + original_tools: Original BaseTool instances. + + Returns: + Tuple of (openai_tools_schema, available_functions_dict). + """ + return convert_tools_to_openai_schema(original_tools) + + +def build_tool_calls_assistant_message( + tool_calls: list[Any], +) -> tuple[LLMMessage | None, list[dict[str, Any]]]: + """Build an assistant message containing tool call reports. + + Extracts info from each tool call, builds the standard assistant message + format, and preserves raw Gemini parts when applicable. + + Args: + tool_calls: Raw tool call objects from the LLM response. + + Returns: + Tuple of (assistant_message, tool_calls_to_report). + assistant_message is None if no valid tool calls found. + """ + tool_calls_to_report: list[dict[str, Any]] = [] + for tool_call in tool_calls: + info = extract_tool_call_info(tool_call) + if not info: + continue + call_id, func_name, func_args = info + tool_calls_to_report.append( + { + "id": call_id, + "type": "function", + "function": { + "name": func_name, + "arguments": func_args + if isinstance(func_args, str) + else json.dumps(func_args), + }, + } + ) + + if not tool_calls_to_report: + return None, [] + + assistant_message: LLMMessage = { + "role": "assistant", + "content": None, + "tool_calls": tool_calls_to_report, + } + # Preserve raw parts for Gemini compatibility + if all(type(tc).__qualname__ == "Part" for tc in tool_calls): + assistant_message["raw_tool_call_parts"] = list(tool_calls) + + return assistant_message, tool_calls_to_report + + +@dataclass +class NativeToolCallResult: + """Result from executing a single native tool call.""" + + call_id: str + func_name: str + result: str + from_cache: bool = False + result_as_answer: bool = False + tool_message: LLMMessage = field(default_factory=dict) # type: ignore[assignment] + + +def execute_single_native_tool_call( + tool_call: Any, + *, + available_functions: dict[str, Callable[..., Any]], + original_tools: list[BaseTool], + structured_tools: list[CrewStructuredTool] | None, + tools_handler: ToolsHandler | None, + agent: Agent | None, + task: Task | None, + crew: Any | None, + event_source: Any, + printer: Printer | None = None, + verbose: bool = False, +) -> NativeToolCallResult: + """Execute a single native tool call with full lifecycle management. + + Handles: arg parsing, tool lookup, max-usage check, cache read/write, + before/after hooks, event emission, and result_as_answer detection. + + Args: + tool_call: Raw tool call object from the LLM. + available_functions: Map of sanitized tool name -> callable. + original_tools: Original BaseTool list (for cache_function, result_as_answer). + structured_tools: Structured tools list (for hook context). + tools_handler: Optional handler with cache. + agent: The agent instance. + task: The current task. + crew: The crew instance. + event_source: The object to use as event emitter source. + printer: Optional printer for verbose logging. + verbose: Whether to print verbose output. + + Returns: + NativeToolCallResult with all execution details. + """ + from crewai.events.event_bus import crewai_event_bus + from crewai.events.types.tool_usage_events import ( + ToolUsageErrorEvent, + ToolUsageFinishedEvent, + ToolUsageStartedEvent, + ) + from crewai.hooks.tool_hooks import ( + ToolCallHookContext, + get_after_tool_call_hooks, + get_before_tool_call_hooks, + ) + + info = extract_tool_call_info(tool_call) + if not info: + return NativeToolCallResult( + call_id="", func_name="", result="Unrecognized tool call format" + ) + + call_id, func_name, func_args = info + + # Parse arguments + if isinstance(func_args, str): + try: + args_dict = json.loads(func_args) + except json.JSONDecodeError: + args_dict = {} + else: + args_dict = func_args + + agent_key = getattr(agent, "key", "unknown") if agent else "unknown" + + # Find original tool for cache_function and result_as_answer + original_tool: BaseTool | None = None + for tool in original_tools: + if sanitize_tool_name(tool.name) == func_name: + original_tool = tool + break + + # Check max usage count + max_usage_reached = False + if ( + original_tool + and original_tool.max_usage_count is not None + and original_tool.current_usage_count >= original_tool.max_usage_count + ): + max_usage_reached = True + + # Check cache + from_cache = False + input_str = json.dumps(args_dict) if args_dict else "" + result = "Tool not found" + + if tools_handler and tools_handler.cache: + cached_result = tools_handler.cache.read(tool=func_name, input=input_str) + if cached_result is not None: + result = ( + str(cached_result) if not isinstance(cached_result, str) else cached_result + ) + from_cache = True + + # Emit tool started event + started_at = datetime.now() + crewai_event_bus.emit( + event_source, + event=ToolUsageStartedEvent( + tool_name=func_name, + tool_args=args_dict, + from_agent=agent, + from_task=task, + agent_key=agent_key, + ), + ) + + track_delegation_if_needed(func_name, args_dict, task) + + # Find structured tool for hooks + structured_tool: CrewStructuredTool | None = None + for structured in structured_tools or []: + if sanitize_tool_name(structured.name) == func_name: + structured_tool = structured + break + + # Before hooks + hook_blocked = False + before_hook_context = ToolCallHookContext( + tool_name=func_name, + tool_input=args_dict, + tool=structured_tool, # type: ignore[arg-type] + agent=agent, + task=task, + crew=crew, + ) + try: + for hook in get_before_tool_call_hooks(): + if hook(before_hook_context) is False: + hook_blocked = True + break + except Exception: # noqa: S110 + pass + + error_event_emitted = False + if hook_blocked: + result = f"Tool execution blocked by hook. Tool: {func_name}" + elif not from_cache and not max_usage_reached: + if func_name in available_functions: + try: + tool_func = available_functions[func_name] + raw_result = tool_func(**args_dict) + + # Cache result + if tools_handler and tools_handler.cache: + should_cache = True + if original_tool: + should_cache = original_tool.cache_function(args_dict, raw_result) + if should_cache: + tools_handler.cache.add( + tool=func_name, input=input_str, output=raw_result + ) + + result = ( + str(raw_result) if not isinstance(raw_result, str) else raw_result + ) + except Exception as e: + result = f"Error executing tool: {e}" + if task: + task.increment_tools_errors() + crewai_event_bus.emit( + event_source, + event=ToolUsageErrorEvent( + tool_name=func_name, + tool_args=args_dict, + from_agent=agent, + from_task=task, + agent_key=agent_key, + error=e, + ), + ) + error_event_emitted = True + elif max_usage_reached and original_tool: + result = ( + f"Tool '{func_name}' has reached its usage limit of " + f"{original_tool.max_usage_count} times and cannot be used anymore." + ) + + # After hooks + after_hook_context = ToolCallHookContext( + tool_name=func_name, + tool_input=args_dict, + tool=structured_tool, # type: ignore[arg-type] + agent=agent, + task=task, + crew=crew, + tool_result=result, + ) + try: + for after_hook in get_after_tool_call_hooks(): + hook_result = after_hook(after_hook_context) + if hook_result is not None: + result = hook_result + after_hook_context.tool_result = result + except Exception: # noqa: S110 + pass + + # Emit tool finished event (only if error event wasn't already emitted) + if not error_event_emitted: + crewai_event_bus.emit( + event_source, + event=ToolUsageFinishedEvent( + output=result, + tool_name=func_name, + tool_args=args_dict, + from_agent=agent, + from_task=task, + agent_key=agent_key, + started_at=started_at, + finished_at=datetime.now(), + ), + ) + + # Build tool result message + tool_message: LLMMessage = { + "role": "tool", + "tool_call_id": call_id, + "name": func_name, + "content": result, + } + + if verbose and printer: + cache_info = " (from cache)" if from_cache else "" + printer.print( + content=f"Tool {func_name} executed with result{cache_info}: {result[:200]}...", + color="green", + ) + + # Check result_as_answer + is_result_as_answer = bool( + original_tool + and hasattr(original_tool, "result_as_answer") + and original_tool.result_as_answer + ) + + return NativeToolCallResult( + call_id=call_id, + func_name=func_name, + result=result, + from_cache=from_cache, + result_as_answer=is_result_as_answer, + tool_message=tool_message, + ) + + def _setup_before_llm_call_hooks( executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None, printer: Printer,