mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
* WIP * WIP * wip * wip * WIP * More WIP * Its working but needs a massive clean up * output type works now * Usage metrics fixed * more testing * WIP * cleaning up * Update logger * 99% done. Need to make docs match new example * cleanup * drop hard coded examples * docs * Clean up * Fix errors * Trying to fix CI issues * more type checker fixes * More type checking fixes * Update LiteAgent documentation for clarity and consistency; replace WebsiteSearchTool with SerperDevTool, and improve formatting in examples. * fix fingerprinting issues * fix type-checker * Fix type-checker issue by adding type ignore comment for cache read in ToolUsage class * Add optional agent parameter to CrewAgentParser and enhance action handling logic * Remove unused parameters from ToolUsage instantiation in tests and clean up debug print statement in CrewAgentParser. * Remove deprecated test files and examples for LiteAgent; add comprehensive tests for LiteAgent functionality, including tool usage and structured output handling. * Remove unused variable 'result' from ToolUsage class to clean up code. * Add initialization for 'result' variable in ToolUsage class to resolve type-checker warnings * Refactor agent_utils.py by removing unused event imports and adding missing commas in function definitions. Update test_events.py to reflect changes in expected event counts and adjust assertions accordingly. Modify test_tools_emits_error_events.yaml to include new headers and update response content for consistency with recent API changes. * Enhance tests in crew_test.py by verifying cache behavior in test_tools_with_custom_caching and ensuring proper agent initialization with added commas in test_crew_kickoff_for_each_works_with_manager_agent_copy. * Update agent tests to reflect changes in expected call counts and improve response formatting in YAML cassette. Adjusted mock call count from 2 to 3 and refined interaction formats for clarity and consistency. * Refactor agent tests to update model versions and improve response formatting in YAML cassettes. Changed model references from 'o1-preview' to 'o3-mini' and adjusted interaction formats for consistency. Enhanced error handling in context length tests and refined mock setups for better clarity. * Update tool usage logging to ensure tool arguments are consistently formatted as strings. Adjust agent test cases to reflect changes in maximum iterations and expected outputs, enhancing clarity in assertions. Update YAML cassettes to align with new response formats and improve overall consistency across tests. * Update YAML cassette for LLM tests to reflect changes in response structure and model version. Adjusted request and response headers, including updated content length and user agent. Enhanced token limits and request counts for improved testing accuracy. * Update tool usage logging to store tool arguments as native types instead of strings, enhancing data integrity and usability. * Refactor agent tests by removing outdated test cases and updating YAML cassettes to reflect changes in tool usage and response formats. Adjusted request and response headers, including user agent and content length, for improved accuracy in testing. Enhanced interaction formats for consistency across tests. * Add Excalidraw diagram file for visual representation of input-output flow Created a new Excalidraw file that includes a diagram illustrating the input box, database, and output box with connecting arrows. This visual aid enhances understanding of the data flow within the application. * Remove redundant error handling for action and final answer in CrewAgentParser. Update tests to reflect this change by deleting the corresponding test case. --------- Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> Co-authored-by: Lorenze Jay <lorenzejaytech@gmail.com>
625 lines
24 KiB
Python
625 lines
24 KiB
Python
import ast
|
|
import datetime
|
|
import json
|
|
import time
|
|
from dataclasses import dataclass
|
|
from difflib import SequenceMatcher
|
|
from json import JSONDecodeError
|
|
from textwrap import dedent
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
|
|
|
|
import json5
|
|
from json_repair import repair_json
|
|
|
|
from crewai.agents.tools_handler import ToolsHandler
|
|
from crewai.task import Task
|
|
from crewai.telemetry import Telemetry
|
|
from crewai.tools.structured_tool import CrewStructuredTool
|
|
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
|
|
from crewai.utilities import I18N, Converter, Printer
|
|
from crewai.utilities.agent_utils import (
|
|
get_tool_names,
|
|
render_text_description_and_args,
|
|
)
|
|
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
|
from crewai.utilities.events.tool_usage_events import (
|
|
ToolSelectionErrorEvent,
|
|
ToolUsageErrorEvent,
|
|
ToolUsageFinishedEvent,
|
|
ToolValidateInputErrorEvent,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from crewai.agents.agent_builder.base_agent import BaseAgent
|
|
from crewai.lite_agent import LiteAgent
|
|
|
|
OPENAI_BIGGER_MODELS = [
|
|
"gpt-4",
|
|
"gpt-4o",
|
|
"o1-preview",
|
|
"o1-mini",
|
|
"o1",
|
|
"o3",
|
|
"o3-mini",
|
|
]
|
|
|
|
|
|
class ToolUsageErrorException(Exception):
|
|
"""Exception raised for errors in the tool usage."""
|
|
|
|
def __init__(self, message: str) -> None:
|
|
self.message = message
|
|
super().__init__(self.message)
|
|
|
|
|
|
class ToolUsage:
|
|
"""
|
|
Class that represents the usage of a tool by an agent.
|
|
|
|
Attributes:
|
|
task: Task being executed.
|
|
tools_handler: Tools handler that will manage the tool usage.
|
|
tools: List of tools available for the agent.
|
|
original_tools: Original tools available for the agent before being converted to BaseTool.
|
|
tools_description: Description of the tools available for the agent.
|
|
tools_names: Names of the tools available for the agent.
|
|
function_calling_llm: Language model to be used for the tool usage.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
tools_handler: Optional[ToolsHandler],
|
|
tools: List[CrewStructuredTool],
|
|
task: Optional[Task],
|
|
function_calling_llm: Any,
|
|
agent: Optional[Union["BaseAgent", "LiteAgent"]] = None,
|
|
action: Any = None,
|
|
fingerprint_context: Optional[Dict[str, str]] = None,
|
|
) -> None:
|
|
self._i18n: I18N = agent.i18n if agent else I18N()
|
|
self._printer: Printer = Printer()
|
|
self._telemetry: Telemetry = Telemetry()
|
|
self._run_attempts: int = 1
|
|
self._max_parsing_attempts: int = 3
|
|
self._remember_format_after_usages: int = 3
|
|
self.agent = agent
|
|
self.tools_description = render_text_description_and_args(tools)
|
|
self.tools_names = get_tool_names(tools)
|
|
self.tools_handler = tools_handler
|
|
self.tools = tools
|
|
self.task = task
|
|
self.action = action
|
|
self.function_calling_llm = function_calling_llm
|
|
self.fingerprint_context = fingerprint_context or {}
|
|
|
|
# Set the maximum parsing attempts for bigger models
|
|
if (
|
|
self.function_calling_llm
|
|
and self.function_calling_llm in OPENAI_BIGGER_MODELS
|
|
):
|
|
self._max_parsing_attempts = 2
|
|
self._remember_format_after_usages = 4
|
|
|
|
def parse_tool_calling(self, tool_string: str):
|
|
"""Parse the tool string and return the tool calling."""
|
|
return self._tool_calling(tool_string)
|
|
|
|
def use(
|
|
self, calling: Union[ToolCalling, InstructorToolCalling], tool_string: str
|
|
) -> str:
|
|
if isinstance(calling, ToolUsageErrorException):
|
|
error = calling.message
|
|
if self.agent and self.agent.verbose:
|
|
self._printer.print(content=f"\n\n{error}\n", color="red")
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
return error
|
|
|
|
try:
|
|
tool = self._select_tool(calling.tool_name)
|
|
except Exception as e:
|
|
error = getattr(e, "message", str(e))
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
if self.agent and self.agent.verbose:
|
|
self._printer.print(content=f"\n\n{error}\n", color="red")
|
|
return error
|
|
|
|
if (
|
|
isinstance(tool, CrewStructuredTool)
|
|
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
|
|
):
|
|
try:
|
|
result = self._use(tool_string=tool_string, tool=tool, calling=calling)
|
|
return result
|
|
|
|
except Exception as e:
|
|
error = getattr(e, "message", str(e))
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
if self.agent and self.agent.verbose:
|
|
self._printer.print(content=f"\n\n{error}\n", color="red")
|
|
return error
|
|
|
|
return f"{self._use(tool_string=tool_string, tool=tool, calling=calling)}"
|
|
|
|
def _use(
|
|
self,
|
|
tool_string: str,
|
|
tool: CrewStructuredTool,
|
|
calling: Union[ToolCalling, InstructorToolCalling],
|
|
) -> str:
|
|
if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
|
|
try:
|
|
result = self._i18n.errors("task_repeated_usage").format(
|
|
tool_names=self.tools_names
|
|
)
|
|
self._telemetry.tool_repeated_usage(
|
|
llm=self.function_calling_llm,
|
|
tool_name=tool.name,
|
|
attempts=self._run_attempts,
|
|
)
|
|
result = self._format_result(result=result) # type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None)
|
|
return result # type: ignore # Fix the return type of this function
|
|
|
|
except Exception:
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
|
|
started_at = time.time()
|
|
from_cache = False
|
|
result = None # type: ignore
|
|
|
|
if self.tools_handler and self.tools_handler.cache:
|
|
result = self.tools_handler.cache.read(
|
|
tool=calling.tool_name, input=calling.arguments
|
|
) # type: ignore
|
|
from_cache = result is not None
|
|
|
|
available_tool = next(
|
|
(
|
|
available_tool
|
|
for available_tool in self.tools
|
|
if available_tool.name == tool.name
|
|
),
|
|
None,
|
|
)
|
|
|
|
if result is None:
|
|
try:
|
|
if calling.tool_name in [
|
|
"Delegate work to coworker",
|
|
"Ask question to coworker",
|
|
]:
|
|
coworker = (
|
|
calling.arguments.get("coworker") if calling.arguments else None
|
|
)
|
|
if self.task:
|
|
self.task.increment_delegations(coworker)
|
|
|
|
if calling.arguments:
|
|
try:
|
|
acceptable_args = tool.args_schema.model_json_schema()[
|
|
"properties"
|
|
].keys() # type: ignore
|
|
arguments = {
|
|
k: v
|
|
for k, v in calling.arguments.items()
|
|
if k in acceptable_args
|
|
}
|
|
# Add fingerprint metadata if available
|
|
arguments = self._add_fingerprint_metadata(arguments)
|
|
result = tool.invoke(input=arguments)
|
|
except Exception:
|
|
arguments = calling.arguments
|
|
# Add fingerprint metadata if available
|
|
arguments = self._add_fingerprint_metadata(arguments)
|
|
result = tool.invoke(input=arguments)
|
|
else:
|
|
# Add fingerprint metadata even to empty arguments
|
|
arguments = self._add_fingerprint_metadata({})
|
|
result = tool.invoke(input=arguments)
|
|
except Exception as e:
|
|
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
|
|
self._run_attempts += 1
|
|
if self._run_attempts > self._max_parsing_attempts:
|
|
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
|
|
error_message = self._i18n.errors("tool_usage_exception").format(
|
|
error=e, tool=tool.name, tool_inputs=tool.description
|
|
)
|
|
error = ToolUsageErrorException(
|
|
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
|
|
).message
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
if self.agent and self.agent.verbose:
|
|
self._printer.print(
|
|
content=f"\n\n{error_message}\n", color="red"
|
|
)
|
|
return error # type: ignore # No return value expected
|
|
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
return self.use(calling=calling, tool_string=tool_string) # type: ignore # No return value expected
|
|
|
|
if self.tools_handler:
|
|
should_cache = True
|
|
if (
|
|
hasattr(available_tool, "cache_function")
|
|
and available_tool.cache_function # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
|
|
):
|
|
should_cache = available_tool.cache_function( # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
|
|
calling.arguments, result
|
|
)
|
|
|
|
self.tools_handler.on_tool_use(
|
|
calling=calling, output=result, should_cache=should_cache
|
|
)
|
|
self._telemetry.tool_usage(
|
|
llm=self.function_calling_llm,
|
|
tool_name=tool.name,
|
|
attempts=self._run_attempts,
|
|
)
|
|
result = self._format_result(result=result) # type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None)
|
|
data = {
|
|
"result": result,
|
|
"tool_name": tool.name,
|
|
"tool_args": calling.arguments,
|
|
}
|
|
|
|
self.on_tool_use_finished(
|
|
tool=tool,
|
|
tool_calling=calling,
|
|
from_cache=from_cache,
|
|
started_at=started_at,
|
|
result=result,
|
|
)
|
|
|
|
if (
|
|
hasattr(available_tool, "result_as_answer")
|
|
and available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
|
|
):
|
|
result_as_answer = available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "result_as_answer"
|
|
data["result_as_answer"] = result_as_answer # type: ignore
|
|
|
|
if self.agent and hasattr(self.agent, "tools_results"):
|
|
self.agent.tools_results.append(data)
|
|
|
|
return result
|
|
|
|
def _format_result(self, result: Any) -> str:
|
|
if self.task:
|
|
self.task.used_tools += 1
|
|
if self._should_remember_format():
|
|
result = self._remember_format(result=result)
|
|
return str(result)
|
|
|
|
def _should_remember_format(self) -> bool:
|
|
if self.task:
|
|
return self.task.used_tools % self._remember_format_after_usages == 0
|
|
return False
|
|
|
|
def _remember_format(self, result: str) -> str:
|
|
result = str(result)
|
|
result += "\n\n" + self._i18n.slice("tools").format(
|
|
tools=self.tools_description, tool_names=self.tools_names
|
|
)
|
|
return result
|
|
|
|
def _check_tool_repeated_usage(
|
|
self, calling: Union[ToolCalling, InstructorToolCalling]
|
|
) -> bool:
|
|
if not self.tools_handler:
|
|
return False
|
|
if last_tool_usage := self.tools_handler.last_used_tool:
|
|
return (calling.tool_name == last_tool_usage.tool_name) and (
|
|
calling.arguments == last_tool_usage.arguments
|
|
)
|
|
return False
|
|
|
|
def _select_tool(self, tool_name: str) -> Any:
|
|
order_tools = sorted(
|
|
self.tools,
|
|
key=lambda tool: SequenceMatcher(
|
|
None, tool.name.lower().strip(), tool_name.lower().strip()
|
|
).ratio(),
|
|
reverse=True,
|
|
)
|
|
for tool in order_tools:
|
|
if (
|
|
tool.name.lower().strip() == tool_name.lower().strip()
|
|
or SequenceMatcher(
|
|
None, tool.name.lower().strip(), tool_name.lower().strip()
|
|
).ratio()
|
|
> 0.85
|
|
):
|
|
return tool
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
tool_selection_data: Dict[str, Any] = {
|
|
"agent_key": getattr(self.agent, "key", None) if self.agent else None,
|
|
"agent_role": getattr(self.agent, "role", None) if self.agent else None,
|
|
"tool_name": tool_name,
|
|
"tool_args": {},
|
|
"tool_class": self.tools_description,
|
|
}
|
|
if tool_name and tool_name != "":
|
|
error = f"Action '{tool_name}' don't exist, these are the only available Actions:\n{self.tools_description}"
|
|
crewai_event_bus.emit(
|
|
self,
|
|
ToolSelectionErrorEvent(
|
|
**tool_selection_data,
|
|
error=error,
|
|
),
|
|
)
|
|
raise Exception(error)
|
|
else:
|
|
error = f"I forgot the Action name, these are the only available Actions: {self.tools_description}"
|
|
crewai_event_bus.emit(
|
|
self,
|
|
ToolSelectionErrorEvent(
|
|
**tool_selection_data,
|
|
error=error,
|
|
),
|
|
)
|
|
raise Exception(error)
|
|
|
|
def _render(self) -> str:
|
|
"""Render the tool name and description in plain text."""
|
|
descriptions = []
|
|
for tool in self.tools:
|
|
descriptions.append(tool.description)
|
|
return "\n--\n".join(descriptions)
|
|
|
|
def _function_calling(
|
|
self, tool_string: str
|
|
) -> Union[ToolCalling, InstructorToolCalling]:
|
|
model = (
|
|
InstructorToolCalling
|
|
if self.function_calling_llm.supports_function_calling()
|
|
else ToolCalling
|
|
)
|
|
converter = Converter(
|
|
text=f"Only tools available:\n###\n{self._render()}\n\nReturn a valid schema for the tool, the tool name must be exactly equal one of the options, use this text to inform the valid output schema:\n\n### TEXT \n{tool_string}",
|
|
llm=self.function_calling_llm,
|
|
model=model,
|
|
instructions=dedent(
|
|
"""\
|
|
The schema should have the following structure, only two keys:
|
|
- tool_name: str
|
|
- arguments: dict (always a dictionary, with all arguments being passed)
|
|
|
|
Example:
|
|
{"tool_name": "tool name", "arguments": {"arg_name1": "value", "arg_name2": 2}}""",
|
|
),
|
|
max_attempts=1,
|
|
)
|
|
tool_object = converter.to_pydantic()
|
|
if not isinstance(tool_object, (ToolCalling, InstructorToolCalling)):
|
|
raise ToolUsageErrorException("Failed to parse tool calling")
|
|
|
|
return tool_object
|
|
|
|
def _original_tool_calling(
|
|
self, tool_string: str, raise_error: bool = False
|
|
) -> Union[ToolCalling, InstructorToolCalling, ToolUsageErrorException]:
|
|
tool_name = self.action.tool
|
|
tool = self._select_tool(tool_name)
|
|
try:
|
|
arguments = self._validate_tool_input(self.action.tool_input)
|
|
|
|
except Exception:
|
|
if raise_error:
|
|
raise
|
|
else:
|
|
return ToolUsageErrorException(
|
|
f"{self._i18n.errors('tool_arguments_error')}"
|
|
)
|
|
|
|
if not isinstance(arguments, dict):
|
|
if raise_error:
|
|
raise
|
|
else:
|
|
return ToolUsageErrorException(
|
|
f"{self._i18n.errors('tool_arguments_error')}"
|
|
)
|
|
|
|
return ToolCalling(
|
|
tool_name=tool.name,
|
|
arguments=arguments,
|
|
)
|
|
|
|
def _tool_calling(
|
|
self, tool_string: str
|
|
) -> Union[ToolCalling, InstructorToolCalling, ToolUsageErrorException]:
|
|
try:
|
|
try:
|
|
return self._original_tool_calling(tool_string, raise_error=True)
|
|
except Exception:
|
|
if self.function_calling_llm:
|
|
return self._function_calling(tool_string)
|
|
else:
|
|
return self._original_tool_calling(tool_string)
|
|
except Exception as e:
|
|
self._run_attempts += 1
|
|
if self._run_attempts > self._max_parsing_attempts:
|
|
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
if self.agent and self.agent.verbose:
|
|
self._printer.print(content=f"\n\n{e}\n", color="red")
|
|
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
|
|
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
|
|
)
|
|
return self._tool_calling(tool_string)
|
|
|
|
def _validate_tool_input(self, tool_input: Optional[str]) -> Dict[str, Any]:
|
|
if tool_input is None:
|
|
return {}
|
|
|
|
if not isinstance(tool_input, str) or not tool_input.strip():
|
|
raise Exception(
|
|
"Tool input must be a valid dictionary in JSON or Python literal format"
|
|
)
|
|
|
|
# Attempt 1: Parse as JSON
|
|
try:
|
|
arguments = json.loads(tool_input)
|
|
if isinstance(arguments, dict):
|
|
return arguments
|
|
except (JSONDecodeError, TypeError):
|
|
pass # Continue to the next parsing attempt
|
|
|
|
# Attempt 2: Parse as Python literal
|
|
try:
|
|
arguments = ast.literal_eval(tool_input)
|
|
if isinstance(arguments, dict):
|
|
return arguments
|
|
except (ValueError, SyntaxError):
|
|
repaired_input = repair_json(tool_input)
|
|
pass # Continue to the next parsing attempt
|
|
|
|
# Attempt 3: Parse as JSON5
|
|
try:
|
|
arguments = json5.loads(tool_input)
|
|
if isinstance(arguments, dict):
|
|
return arguments
|
|
except (JSONDecodeError, ValueError, TypeError):
|
|
pass # Continue to the next parsing attempt
|
|
|
|
# Attempt 4: Repair JSON
|
|
try:
|
|
repaired_input = str(repair_json(tool_input, skip_json_loads=True))
|
|
self._printer.print(
|
|
content=f"Repaired JSON: {repaired_input}", color="blue"
|
|
)
|
|
arguments = json.loads(repaired_input)
|
|
if isinstance(arguments, dict):
|
|
return arguments
|
|
except Exception as e:
|
|
error = f"Failed to repair JSON: {e}"
|
|
self._printer.print(content=error, color="red")
|
|
|
|
error_message = (
|
|
"Tool input must be a valid dictionary in JSON or Python literal format"
|
|
)
|
|
self._emit_validate_input_error(error_message)
|
|
# If all parsing attempts fail, raise an error
|
|
raise Exception(error_message)
|
|
|
|
def _emit_validate_input_error(self, final_error: str):
|
|
tool_selection_data = {
|
|
"agent_key": getattr(self.agent, "key", None) if self.agent else None,
|
|
"agent_role": getattr(self.agent, "role", None) if self.agent else None,
|
|
"tool_name": self.action.tool,
|
|
"tool_args": str(self.action.tool_input),
|
|
"tool_class": self.__class__.__name__,
|
|
"agent": self.agent, # Adding agent for fingerprint extraction
|
|
}
|
|
|
|
# Include fingerprint context if available
|
|
if self.fingerprint_context:
|
|
tool_selection_data.update(self.fingerprint_context)
|
|
|
|
crewai_event_bus.emit(
|
|
self,
|
|
ToolValidateInputErrorEvent(**tool_selection_data, error=final_error),
|
|
)
|
|
|
|
def on_tool_error(
|
|
self,
|
|
tool: Any,
|
|
tool_calling: Union[ToolCalling, InstructorToolCalling],
|
|
e: Exception,
|
|
) -> None:
|
|
event_data = self._prepare_event_data(tool, tool_calling)
|
|
crewai_event_bus.emit(self, ToolUsageErrorEvent(**{**event_data, "error": e}))
|
|
|
|
def on_tool_use_finished(
|
|
self,
|
|
tool: Any,
|
|
tool_calling: Union[ToolCalling, InstructorToolCalling],
|
|
from_cache: bool,
|
|
started_at: float,
|
|
result: Any,
|
|
) -> None:
|
|
finished_at = time.time()
|
|
event_data = self._prepare_event_data(tool, tool_calling)
|
|
event_data.update(
|
|
{
|
|
"started_at": datetime.datetime.fromtimestamp(started_at),
|
|
"finished_at": datetime.datetime.fromtimestamp(finished_at),
|
|
"from_cache": from_cache,
|
|
"output": result,
|
|
}
|
|
)
|
|
crewai_event_bus.emit(self, ToolUsageFinishedEvent(**event_data))
|
|
|
|
def _prepare_event_data(
|
|
self, tool: Any, tool_calling: Union[ToolCalling, InstructorToolCalling]
|
|
) -> dict:
|
|
event_data = {
|
|
"run_attempts": self._run_attempts,
|
|
"delegations": self.task.delegations if self.task else 0,
|
|
"tool_name": tool.name,
|
|
"tool_args": tool_calling.arguments,
|
|
"tool_class": tool.__class__.__name__,
|
|
"agent_key": (
|
|
getattr(self.agent, "key", "unknown") if self.agent else "unknown"
|
|
),
|
|
"agent_role": (
|
|
getattr(self.agent, "_original_role", None)
|
|
or getattr(self.agent, "role", "unknown")
|
|
if self.agent
|
|
else "unknown"
|
|
),
|
|
}
|
|
|
|
# Include fingerprint context if available
|
|
if self.fingerprint_context:
|
|
event_data.update(self.fingerprint_context)
|
|
|
|
return event_data
|
|
|
|
def _add_fingerprint_metadata(self, arguments: dict) -> dict:
|
|
"""Add fingerprint metadata to tool arguments if available.
|
|
|
|
Args:
|
|
arguments: The original tool arguments
|
|
|
|
Returns:
|
|
Updated arguments dictionary with fingerprint metadata
|
|
"""
|
|
# Create a shallow copy to avoid modifying the original
|
|
arguments = arguments.copy()
|
|
|
|
# Add security metadata under a designated key
|
|
if "security_context" not in arguments:
|
|
arguments["security_context"] = {}
|
|
|
|
security_context = arguments["security_context"]
|
|
|
|
# Add agent fingerprint if available
|
|
if self.agent and hasattr(self.agent, "security_config"):
|
|
security_config = getattr(self.agent, "security_config", None)
|
|
if security_config and hasattr(security_config, "fingerprint"):
|
|
try:
|
|
security_context["agent_fingerprint"] = (
|
|
security_config.fingerprint.to_dict()
|
|
)
|
|
except AttributeError:
|
|
pass
|
|
|
|
# Add task fingerprint if available
|
|
if self.task and hasattr(self.task, "security_config"):
|
|
security_config = getattr(self.task, "security_config", None)
|
|
if security_config and hasattr(security_config, "fingerprint"):
|
|
try:
|
|
security_context["task_fingerprint"] = (
|
|
security_config.fingerprint.to_dict()
|
|
)
|
|
except AttributeError:
|
|
pass
|
|
|
|
return arguments
|