diff --git a/lib/crewai/src/crewai/tools/tool_usage.py b/lib/crewai/src/crewai/tools/tool_usage.py index ab3d0fc25..dc46267bc 100644 --- a/lib/crewai/src/crewai/tools/tool_usage.py +++ b/lib/crewai/src/crewai/tools/tool_usage.py @@ -241,6 +241,9 @@ class ToolUsage: if self.task: self.task.increment_tools_errors() + started_at = time.time() + started_event_emitted = False + if self.agent: event_data = { "agent_key": self.agent.key, @@ -258,151 +261,160 @@ class ToolUsage: event_data["task_name"] = self.task.name or self.task.description event_data["task_id"] = str(self.task.id) crewai_event_bus.emit(self, ToolUsageStartedEvent(**event_data)) + started_event_emitted = True - started_at = time.time() from_cache = False result = None # type: ignore + should_retry = False + available_tool = None - if self.tools_handler and self.tools_handler.cache: - input_str = "" - if calling.arguments: - if isinstance(calling.arguments, dict): - input_str = json.dumps(calling.arguments) - else: - input_str = str(calling.arguments) + try: + if self.tools_handler and self.tools_handler.cache: + input_str = "" + if calling.arguments: + if isinstance(calling.arguments, dict): + input_str = json.dumps(calling.arguments) + else: + input_str = str(calling.arguments) - result = self.tools_handler.cache.read( - tool=calling.tool_name, input=input_str - ) # type: ignore - from_cache = result is not None + result = self.tools_handler.cache.read( + tool=calling.tool_name, input=input_str + ) # type: ignore + from_cache = result is not None - available_tool = next( - ( - available_tool - for available_tool in self.tools - if available_tool.name == tool.name - ), - None, - ) + available_tool = next( + ( + available_tool + for available_tool in self.tools + if available_tool.name == tool.name + ), + None, + ) - usage_limit_error = self._check_usage_limit(available_tool, tool.name) - if usage_limit_error: - try: + usage_limit_error = self._check_usage_limit(available_tool, tool.name) + if usage_limit_error: result = usage_limit_error self._telemetry.tool_usage_error(llm=self.function_calling_llm) - return self._format_result(result=result) - except Exception: - if self.task: - self.task.increment_tools_errors() - - if result is None: - try: - if calling.tool_name in [ - "Delegate work to coworker", - "Ask question to coworker", - ]: - coworker = ( - calling.arguments.get("coworker") if calling.arguments else None - ) - if self.task: - self.task.increment_delegations(coworker) - - if calling.arguments: - try: - acceptable_args = tool.args_schema.model_json_schema()[ - "properties" - ].keys() - arguments = { - k: v - for k, v in calling.arguments.items() - if k in acceptable_args - } - arguments = self._add_fingerprint_metadata(arguments) - result = await tool.ainvoke(input=arguments) - except Exception: - arguments = calling.arguments - arguments = self._add_fingerprint_metadata(arguments) - result = await tool.ainvoke(input=arguments) - else: - arguments = self._add_fingerprint_metadata({}) - result = await tool.ainvoke(input=arguments) - except Exception as e: - self.on_tool_error(tool=tool, tool_calling=calling, e=e) - self._run_attempts += 1 - if self._run_attempts > self._max_parsing_attempts: - self._telemetry.tool_usage_error(llm=self.function_calling_llm) - error_message = self._i18n.errors("tool_usage_exception").format( - error=e, tool=tool.name, tool_inputs=tool.description - ) - error = ToolUsageError( - f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}" - ).message - if self.task: - self.task.increment_tools_errors() - if self.agent and self.agent.verbose: - self._printer.print( - content=f"\n\n{error_message}\n", color="red" + result = self._format_result(result=result) + # Don't return early - fall through to finally block + elif result is None: + try: + if calling.tool_name in [ + "Delegate work to coworker", + "Ask question to coworker", + ]: + coworker = ( + calling.arguments.get("coworker") + if calling.arguments + else None ) - return error + if self.task: + self.task.increment_delegations(coworker) - if self.task: - self.task.increment_tools_errors() - return await self.ause(calling=calling, tool_string=tool_string) + if calling.arguments: + try: + acceptable_args = tool.args_schema.model_json_schema()[ + "properties" + ].keys() + arguments = { + k: v + for k, v in calling.arguments.items() + if k in acceptable_args + } + arguments = self._add_fingerprint_metadata(arguments) + result = await tool.ainvoke(input=arguments) + except Exception: + arguments = calling.arguments + arguments = self._add_fingerprint_metadata(arguments) + result = await tool.ainvoke(input=arguments) + else: + arguments = self._add_fingerprint_metadata({}) + result = await tool.ainvoke(input=arguments) - if self.tools_handler: - should_cache = True - if ( - hasattr(available_tool, "cache_function") - and available_tool.cache_function - ): - should_cache = available_tool.cache_function( - calling.arguments, result + if self.tools_handler: + should_cache = True + if ( + hasattr(available_tool, "cache_function") + and available_tool.cache_function + ): + should_cache = available_tool.cache_function( + calling.arguments, result + ) + + self.tools_handler.on_tool_use( + calling=calling, output=result, should_cache=should_cache + ) + + self._telemetry.tool_usage( + llm=self.function_calling_llm, + tool_name=tool.name, + attempts=self._run_attempts, ) + result = self._format_result(result=result) + data = { + "result": result, + "tool_name": tool.name, + "tool_args": calling.arguments, + } - self.tools_handler.on_tool_use( - calling=calling, output=result, should_cache=should_cache + if ( + hasattr(available_tool, "result_as_answer") + and available_tool.result_as_answer + ): + result_as_answer = available_tool.result_as_answer + data["result_as_answer"] = result_as_answer + + if self.agent and hasattr(self.agent, "tools_results"): + self.agent.tools_results.append(data) + + if available_tool and hasattr( + available_tool, "current_usage_count" + ): + available_tool.current_usage_count += 1 + if ( + hasattr(available_tool, "max_usage_count") + and available_tool.max_usage_count is not None + ): + self._printer.print( + content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}", + color="blue", + ) + + except Exception as e: + self.on_tool_error(tool=tool, tool_calling=calling, e=e) + self._run_attempts += 1 + if self._run_attempts > self._max_parsing_attempts: + self._telemetry.tool_usage_error(llm=self.function_calling_llm) + error_message = self._i18n.errors( + "tool_usage_exception" + ).format(error=e, tool=tool.name, tool_inputs=tool.description) + result = ToolUsageError( + f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}" + ).message + if self.task: + self.task.increment_tools_errors() + if self.agent and self.agent.verbose: + self._printer.print( + content=f"\n\n{error_message}\n", color="red" + ) + else: + if self.task: + self.task.increment_tools_errors() + should_retry = True + + finally: + if started_event_emitted: + self.on_tool_use_finished( + tool=tool, + tool_calling=calling, + from_cache=from_cache, + started_at=started_at, + result=result, ) - self._telemetry.tool_usage( - llm=self.function_calling_llm, - tool_name=tool.name, - attempts=self._run_attempts, - ) - result = self._format_result(result=result) - data = { - "result": result, - "tool_name": tool.name, - "tool_args": calling.arguments, - } - - self.on_tool_use_finished( - tool=tool, - tool_calling=calling, - from_cache=from_cache, - started_at=started_at, - result=result, - ) - - if ( - hasattr(available_tool, "result_as_answer") - and available_tool.result_as_answer # type: ignore - ): - result_as_answer = available_tool.result_as_answer # type: ignore - data["result_as_answer"] = result_as_answer # type: ignore - - if self.agent and hasattr(self.agent, "tools_results"): - self.agent.tools_results.append(data) - - if available_tool and hasattr(available_tool, "current_usage_count"): - available_tool.current_usage_count += 1 - if ( - hasattr(available_tool, "max_usage_count") - and available_tool.max_usage_count is not None - ): - self._printer.print( - content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}", - color="blue", - ) + # Handle retry after finally block ensures finished event was emitted + if should_retry: + return await self.ause(calling=calling, tool_string=tool_string) return result @@ -412,6 +424,7 @@ class ToolUsage: tool: CrewStructuredTool, calling: ToolCalling | InstructorToolCalling, ) -> str: + # Repeated usage check happens before event emission - safe to return early if self._check_tool_repeated_usage(calling=calling): try: result = self._i18n.errors("task_repeated_usage").format( @@ -428,6 +441,9 @@ class ToolUsage: if self.task: self.task.increment_tools_errors() + started_at = time.time() + started_event_emitted = False + if self.agent: event_data = { "agent_key": self.agent.key, @@ -446,155 +462,160 @@ class ToolUsage: event_data["task_name"] = self.task.name or self.task.description event_data["task_id"] = str(self.task.id) crewai_event_bus.emit(self, ToolUsageStartedEvent(**event_data)) + started_event_emitted = True - started_at = time.time() from_cache = False result = None # type: ignore + should_retry = False + available_tool = None - if self.tools_handler and self.tools_handler.cache: - input_str = "" - if calling.arguments: - if isinstance(calling.arguments, dict): - import json + try: + if self.tools_handler and self.tools_handler.cache: + input_str = "" + if calling.arguments: + if isinstance(calling.arguments, dict): + input_str = json.dumps(calling.arguments) + else: + input_str = str(calling.arguments) - input_str = json.dumps(calling.arguments) - else: - input_str = str(calling.arguments) + result = self.tools_handler.cache.read( + tool=calling.tool_name, input=input_str + ) # type: ignore + from_cache = result is not None - result = self.tools_handler.cache.read( - tool=calling.tool_name, input=input_str - ) # type: ignore - from_cache = result is not None + available_tool = next( + ( + available_tool + for available_tool in self.tools + if available_tool.name == tool.name + ), + None, + ) - available_tool = next( - ( - available_tool - for available_tool in self.tools - if available_tool.name == tool.name - ), - None, - ) - - usage_limit_error = self._check_usage_limit(available_tool, tool.name) - if usage_limit_error: - try: + usage_limit_error = self._check_usage_limit(available_tool, tool.name) + if usage_limit_error: result = usage_limit_error self._telemetry.tool_usage_error(llm=self.function_calling_llm) - return self._format_result(result=result) - except Exception: - if self.task: - self.task.increment_tools_errors() - - if result is None: - try: - if calling.tool_name in [ - "Delegate work to coworker", - "Ask question to coworker", - ]: - coworker = ( - calling.arguments.get("coworker") if calling.arguments else None - ) - if self.task: - self.task.increment_delegations(coworker) - - if calling.arguments: - try: - acceptable_args = tool.args_schema.model_json_schema()[ - "properties" - ].keys() - arguments = { - k: v - for k, v in calling.arguments.items() - if k in acceptable_args - } - # Add fingerprint metadata if available - arguments = self._add_fingerprint_metadata(arguments) - result = tool.invoke(input=arguments) - except Exception: - arguments = calling.arguments - # Add fingerprint metadata if available - arguments = self._add_fingerprint_metadata(arguments) - result = tool.invoke(input=arguments) - else: - # Add fingerprint metadata even to empty arguments - arguments = self._add_fingerprint_metadata({}) - result = tool.invoke(input=arguments) - except Exception as e: - self.on_tool_error(tool=tool, tool_calling=calling, e=e) - self._run_attempts += 1 - if self._run_attempts > self._max_parsing_attempts: - self._telemetry.tool_usage_error(llm=self.function_calling_llm) - error_message = self._i18n.errors("tool_usage_exception").format( - error=e, tool=tool.name, tool_inputs=tool.description - ) - error = ToolUsageError( - f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}" - ).message - if self.task: - self.task.increment_tools_errors() - if self.agent and self.agent.verbose: - self._printer.print( - content=f"\n\n{error_message}\n", color="red" + result = self._format_result(result=result) + # Don't return early - fall through to finally block + elif result is None: + try: + if calling.tool_name in [ + "Delegate work to coworker", + "Ask question to coworker", + ]: + coworker = ( + calling.arguments.get("coworker") + if calling.arguments + else None ) - return error + if self.task: + self.task.increment_delegations(coworker) - if self.task: - self.task.increment_tools_errors() - return self.use(calling=calling, tool_string=tool_string) + if calling.arguments: + try: + acceptable_args = tool.args_schema.model_json_schema()[ + "properties" + ].keys() + arguments = { + k: v + for k, v in calling.arguments.items() + if k in acceptable_args + } + arguments = self._add_fingerprint_metadata(arguments) + result = tool.invoke(input=arguments) + except Exception: + arguments = calling.arguments + arguments = self._add_fingerprint_metadata(arguments) + result = tool.invoke(input=arguments) + else: + arguments = self._add_fingerprint_metadata({}) + result = tool.invoke(input=arguments) - if self.tools_handler: - should_cache = True - if ( - hasattr(available_tool, "cache_function") - and available_tool.cache_function - ): - should_cache = available_tool.cache_function( - calling.arguments, result + if self.tools_handler: + should_cache = True + if ( + hasattr(available_tool, "cache_function") + and available_tool.cache_function + ): + should_cache = available_tool.cache_function( + calling.arguments, result + ) + + self.tools_handler.on_tool_use( + calling=calling, output=result, should_cache=should_cache + ) + + self._telemetry.tool_usage( + llm=self.function_calling_llm, + tool_name=tool.name, + attempts=self._run_attempts, ) + result = self._format_result(result=result) + data = { + "result": result, + "tool_name": tool.name, + "tool_args": calling.arguments, + } - self.tools_handler.on_tool_use( - calling=calling, output=result, should_cache=should_cache + if ( + hasattr(available_tool, "result_as_answer") + and available_tool.result_as_answer + ): + result_as_answer = available_tool.result_as_answer + data["result_as_answer"] = result_as_answer + + if self.agent and hasattr(self.agent, "tools_results"): + self.agent.tools_results.append(data) + + if available_tool and hasattr( + available_tool, "current_usage_count" + ): + available_tool.current_usage_count += 1 + if ( + hasattr(available_tool, "max_usage_count") + and available_tool.max_usage_count is not None + ): + self._printer.print( + content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}", + color="blue", + ) + + except Exception as e: + self.on_tool_error(tool=tool, tool_calling=calling, e=e) + self._run_attempts += 1 + if self._run_attempts > self._max_parsing_attempts: + self._telemetry.tool_usage_error(llm=self.function_calling_llm) + error_message = self._i18n.errors( + "tool_usage_exception" + ).format(error=e, tool=tool.name, tool_inputs=tool.description) + result = ToolUsageError( + f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}" + ).message + if self.task: + self.task.increment_tools_errors() + if self.agent and self.agent.verbose: + self._printer.print( + content=f"\n\n{error_message}\n", color="red" + ) + else: + if self.task: + self.task.increment_tools_errors() + should_retry = True + + finally: + if started_event_emitted: + self.on_tool_use_finished( + tool=tool, + tool_calling=calling, + from_cache=from_cache, + started_at=started_at, + result=result, ) - self._telemetry.tool_usage( - llm=self.function_calling_llm, - tool_name=tool.name, - attempts=self._run_attempts, - ) - result = self._format_result(result=result) - data = { - "result": result, - "tool_name": tool.name, - "tool_args": calling.arguments, - } - self.on_tool_use_finished( - tool=tool, - tool_calling=calling, - from_cache=from_cache, - started_at=started_at, - result=result, - ) - - if ( - hasattr(available_tool, "result_as_answer") - and available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "cache_function" - ): - result_as_answer = available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "result_as_answer" - data["result_as_answer"] = result_as_answer # type: ignore - - if self.agent and hasattr(self.agent, "tools_results"): - self.agent.tools_results.append(data) - - if available_tool and hasattr(available_tool, "current_usage_count"): - available_tool.current_usage_count += 1 - if ( - hasattr(available_tool, "max_usage_count") - and available_tool.max_usage_count is not None - ): - self._printer.print( - content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}", - color="blue", - ) + # Handle retry after finally block ensures finished event was emitted + if should_retry: + return self.use(calling=calling, tool_string=tool_string) return result