refactor: use try/finally for event pairing in tool usage

This commit is contained in:
Greyson LaLonde
2026-01-20 01:15:05 -05:00
parent 161f9bd063
commit da07bd4d9f

View File

@@ -241,6 +241,9 @@ class ToolUsage:
if self.task: if self.task:
self.task.increment_tools_errors() self.task.increment_tools_errors()
started_at = time.time()
started_event_emitted = False
if self.agent: if self.agent:
event_data = { event_data = {
"agent_key": self.agent.key, "agent_key": self.agent.key,
@@ -258,151 +261,160 @@ class ToolUsage:
event_data["task_name"] = self.task.name or self.task.description event_data["task_name"] = self.task.name or self.task.description
event_data["task_id"] = str(self.task.id) event_data["task_id"] = str(self.task.id)
crewai_event_bus.emit(self, ToolUsageStartedEvent(**event_data)) crewai_event_bus.emit(self, ToolUsageStartedEvent(**event_data))
started_event_emitted = True
started_at = time.time()
from_cache = False from_cache = False
result = None # type: ignore result = None # type: ignore
should_retry = False
available_tool = None
if self.tools_handler and self.tools_handler.cache: try:
input_str = "" if self.tools_handler and self.tools_handler.cache:
if calling.arguments: input_str = ""
if isinstance(calling.arguments, dict): if calling.arguments:
input_str = json.dumps(calling.arguments) if isinstance(calling.arguments, dict):
else: input_str = json.dumps(calling.arguments)
input_str = str(calling.arguments) else:
input_str = str(calling.arguments)
result = self.tools_handler.cache.read( result = self.tools_handler.cache.read(
tool=calling.tool_name, input=input_str tool=calling.tool_name, input=input_str
) # type: ignore ) # type: ignore
from_cache = result is not None from_cache = result is not None
available_tool = next( available_tool = next(
( (
available_tool available_tool
for available_tool in self.tools for available_tool in self.tools
if available_tool.name == tool.name if available_tool.name == tool.name
), ),
None, None,
) )
usage_limit_error = self._check_usage_limit(available_tool, tool.name) usage_limit_error = self._check_usage_limit(available_tool, tool.name)
if usage_limit_error: if usage_limit_error:
try:
result = usage_limit_error result = usage_limit_error
self._telemetry.tool_usage_error(llm=self.function_calling_llm) self._telemetry.tool_usage_error(llm=self.function_calling_llm)
return self._format_result(result=result) result = self._format_result(result=result)
except Exception: # Don't return early - fall through to finally block
if self.task: elif result is None:
self.task.increment_tools_errors() try:
if calling.tool_name in [
if result is None: "Delegate work to coworker",
try: "Ask question to coworker",
if calling.tool_name in [ ]:
"Delegate work to coworker", coworker = (
"Ask question to coworker", calling.arguments.get("coworker")
]: if calling.arguments
coworker = ( else None
calling.arguments.get("coworker") if calling.arguments else None
)
if self.task:
self.task.increment_delegations(coworker)
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys()
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
arguments = self._add_fingerprint_metadata(arguments)
result = await tool.ainvoke(input=arguments)
except Exception:
arguments = calling.arguments
arguments = self._add_fingerprint_metadata(arguments)
result = await tool.ainvoke(input=arguments)
else:
arguments = self._add_fingerprint_metadata({})
result = await tool.ainvoke(input=arguments)
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
error_message = self._i18n.errors("tool_usage_exception").format(
error=e, tool=tool.name, tool_inputs=tool.description
)
error = ToolUsageError(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(
content=f"\n\n{error_message}\n", color="red"
) )
return error if self.task:
self.task.increment_delegations(coworker)
if self.task: if calling.arguments:
self.task.increment_tools_errors() try:
return await self.ause(calling=calling, tool_string=tool_string) acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys()
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
arguments = self._add_fingerprint_metadata(arguments)
result = await tool.ainvoke(input=arguments)
except Exception:
arguments = calling.arguments
arguments = self._add_fingerprint_metadata(arguments)
result = await tool.ainvoke(input=arguments)
else:
arguments = self._add_fingerprint_metadata({})
result = await tool.ainvoke(input=arguments)
if self.tools_handler: if self.tools_handler:
should_cache = True should_cache = True
if ( if (
hasattr(available_tool, "cache_function") hasattr(available_tool, "cache_function")
and available_tool.cache_function and available_tool.cache_function
): ):
should_cache = available_tool.cache_function( should_cache = available_tool.cache_function(
calling.arguments, result calling.arguments, result
)
self.tools_handler.on_tool_use(
calling=calling, output=result, should_cache=should_cache
)
self._telemetry.tool_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
attempts=self._run_attempts,
) )
result = self._format_result(result=result)
data = {
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
}
self.tools_handler.on_tool_use( if (
calling=calling, output=result, should_cache=should_cache hasattr(available_tool, "result_as_answer")
and available_tool.result_as_answer
):
result_as_answer = available_tool.result_as_answer
data["result_as_answer"] = result_as_answer
if self.agent and hasattr(self.agent, "tools_results"):
self.agent.tools_results.append(data)
if available_tool and hasattr(
available_tool, "current_usage_count"
):
available_tool.current_usage_count += 1
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
error_message = self._i18n.errors(
"tool_usage_exception"
).format(error=e, tool=tool.name, tool_inputs=tool.description)
result = ToolUsageError(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(
content=f"\n\n{error_message}\n", color="red"
)
else:
if self.task:
self.task.increment_tools_errors()
should_retry = True
finally:
if started_event_emitted:
self.on_tool_use_finished(
tool=tool,
tool_calling=calling,
from_cache=from_cache,
started_at=started_at,
result=result,
) )
self._telemetry.tool_usage( # Handle retry after finally block ensures finished event was emitted
llm=self.function_calling_llm, if should_retry:
tool_name=tool.name, return await self.ause(calling=calling, tool_string=tool_string)
attempts=self._run_attempts,
)
result = self._format_result(result=result)
data = {
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
}
self.on_tool_use_finished(
tool=tool,
tool_calling=calling,
from_cache=from_cache,
started_at=started_at,
result=result,
)
if (
hasattr(available_tool, "result_as_answer")
and available_tool.result_as_answer # type: ignore
):
result_as_answer = available_tool.result_as_answer # type: ignore
data["result_as_answer"] = result_as_answer # type: ignore
if self.agent and hasattr(self.agent, "tools_results"):
self.agent.tools_results.append(data)
if available_tool and hasattr(available_tool, "current_usage_count"):
available_tool.current_usage_count += 1
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
return result return result
@@ -412,6 +424,7 @@ class ToolUsage:
tool: CrewStructuredTool, tool: CrewStructuredTool,
calling: ToolCalling | InstructorToolCalling, calling: ToolCalling | InstructorToolCalling,
) -> str: ) -> str:
# Repeated usage check happens before event emission - safe to return early
if self._check_tool_repeated_usage(calling=calling): if self._check_tool_repeated_usage(calling=calling):
try: try:
result = self._i18n.errors("task_repeated_usage").format( result = self._i18n.errors("task_repeated_usage").format(
@@ -428,6 +441,9 @@ class ToolUsage:
if self.task: if self.task:
self.task.increment_tools_errors() self.task.increment_tools_errors()
started_at = time.time()
started_event_emitted = False
if self.agent: if self.agent:
event_data = { event_data = {
"agent_key": self.agent.key, "agent_key": self.agent.key,
@@ -446,155 +462,160 @@ class ToolUsage:
event_data["task_name"] = self.task.name or self.task.description event_data["task_name"] = self.task.name or self.task.description
event_data["task_id"] = str(self.task.id) event_data["task_id"] = str(self.task.id)
crewai_event_bus.emit(self, ToolUsageStartedEvent(**event_data)) crewai_event_bus.emit(self, ToolUsageStartedEvent(**event_data))
started_event_emitted = True
started_at = time.time()
from_cache = False from_cache = False
result = None # type: ignore result = None # type: ignore
should_retry = False
available_tool = None
if self.tools_handler and self.tools_handler.cache: try:
input_str = "" if self.tools_handler and self.tools_handler.cache:
if calling.arguments: input_str = ""
if isinstance(calling.arguments, dict): if calling.arguments:
import json if isinstance(calling.arguments, dict):
input_str = json.dumps(calling.arguments)
else:
input_str = str(calling.arguments)
input_str = json.dumps(calling.arguments) result = self.tools_handler.cache.read(
else: tool=calling.tool_name, input=input_str
input_str = str(calling.arguments) ) # type: ignore
from_cache = result is not None
result = self.tools_handler.cache.read( available_tool = next(
tool=calling.tool_name, input=input_str (
) # type: ignore available_tool
from_cache = result is not None for available_tool in self.tools
if available_tool.name == tool.name
),
None,
)
available_tool = next( usage_limit_error = self._check_usage_limit(available_tool, tool.name)
( if usage_limit_error:
available_tool
for available_tool in self.tools
if available_tool.name == tool.name
),
None,
)
usage_limit_error = self._check_usage_limit(available_tool, tool.name)
if usage_limit_error:
try:
result = usage_limit_error result = usage_limit_error
self._telemetry.tool_usage_error(llm=self.function_calling_llm) self._telemetry.tool_usage_error(llm=self.function_calling_llm)
return self._format_result(result=result) result = self._format_result(result=result)
except Exception: # Don't return early - fall through to finally block
if self.task: elif result is None:
self.task.increment_tools_errors() try:
if calling.tool_name in [
if result is None: "Delegate work to coworker",
try: "Ask question to coworker",
if calling.tool_name in [ ]:
"Delegate work to coworker", coworker = (
"Ask question to coworker", calling.arguments.get("coworker")
]: if calling.arguments
coworker = ( else None
calling.arguments.get("coworker") if calling.arguments else None
)
if self.task:
self.task.increment_delegations(coworker)
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys()
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
# Add fingerprint metadata if available
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
except Exception:
arguments = calling.arguments
# Add fingerprint metadata if available
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
else:
# Add fingerprint metadata even to empty arguments
arguments = self._add_fingerprint_metadata({})
result = tool.invoke(input=arguments)
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
error_message = self._i18n.errors("tool_usage_exception").format(
error=e, tool=tool.name, tool_inputs=tool.description
)
error = ToolUsageError(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(
content=f"\n\n{error_message}\n", color="red"
) )
return error if self.task:
self.task.increment_delegations(coworker)
if self.task: if calling.arguments:
self.task.increment_tools_errors() try:
return self.use(calling=calling, tool_string=tool_string) acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys()
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
except Exception:
arguments = calling.arguments
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
else:
arguments = self._add_fingerprint_metadata({})
result = tool.invoke(input=arguments)
if self.tools_handler: if self.tools_handler:
should_cache = True should_cache = True
if ( if (
hasattr(available_tool, "cache_function") hasattr(available_tool, "cache_function")
and available_tool.cache_function and available_tool.cache_function
): ):
should_cache = available_tool.cache_function( should_cache = available_tool.cache_function(
calling.arguments, result calling.arguments, result
)
self.tools_handler.on_tool_use(
calling=calling, output=result, should_cache=should_cache
)
self._telemetry.tool_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
attempts=self._run_attempts,
) )
result = self._format_result(result=result)
data = {
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
}
self.tools_handler.on_tool_use( if (
calling=calling, output=result, should_cache=should_cache hasattr(available_tool, "result_as_answer")
and available_tool.result_as_answer
):
result_as_answer = available_tool.result_as_answer
data["result_as_answer"] = result_as_answer
if self.agent and hasattr(self.agent, "tools_results"):
self.agent.tools_results.append(data)
if available_tool and hasattr(
available_tool, "current_usage_count"
):
available_tool.current_usage_count += 1
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
error_message = self._i18n.errors(
"tool_usage_exception"
).format(error=e, tool=tool.name, tool_inputs=tool.description)
result = ToolUsageError(
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
if self.task:
self.task.increment_tools_errors()
if self.agent and self.agent.verbose:
self._printer.print(
content=f"\n\n{error_message}\n", color="red"
)
else:
if self.task:
self.task.increment_tools_errors()
should_retry = True
finally:
if started_event_emitted:
self.on_tool_use_finished(
tool=tool,
tool_calling=calling,
from_cache=from_cache,
started_at=started_at,
result=result,
) )
self._telemetry.tool_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
attempts=self._run_attempts,
)
result = self._format_result(result=result)
data = {
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
}
self.on_tool_use_finished( # Handle retry after finally block ensures finished event was emitted
tool=tool, if should_retry:
tool_calling=calling, return self.use(calling=calling, tool_string=tool_string)
from_cache=from_cache,
started_at=started_at,
result=result,
)
if (
hasattr(available_tool, "result_as_answer")
and available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
):
result_as_answer = available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "result_as_answer"
data["result_as_answer"] = result_as_answer # type: ignore
if self.agent and hasattr(self.agent, "tools_results"):
self.agent.tools_results.append(data)
if available_tool and hasattr(available_tool, "current_usage_count"):
available_tool.current_usage_count += 1
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
):
self._printer.print(
content=f"Tool '{available_tool.name}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
color="blue",
)
return result return result