mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 15:48:29 +00:00
* feat: remove unused code and change ToolUsageStarted event place * feat: run lint * feat: add agent refernece inside liteagent * feat: remove unused logic * feat: Remove not needed event * feat: remove test from tool execution erro: * feat: remove cassete
640 lines
24 KiB
Python
640 lines
24 KiB
Python
import ast
|
|
import datetime
|
|
import json
|
|
import time
|
|
from difflib import SequenceMatcher
|
|
from json import JSONDecodeError
|
|
from textwrap import dedent
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
|
|
|
|
import json5
|
|
from json_repair import repair_json
|
|
|
|
from crewai.agents.tools_handler import ToolsHandler
|
|
from crewai.task import Task
|
|
from crewai.telemetry import Telemetry
|
|
from crewai.tools.structured_tool import CrewStructuredTool
|
|
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
|
|
from crewai.utilities import I18N, Converter, Printer
|
|
from crewai.utilities.agent_utils import (
|
|
get_tool_names,
|
|
render_text_description_and_args,
|
|
)
|
|
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
|
from crewai.utilities.events.tool_usage_events import (
|
|
ToolSelectionErrorEvent,
|
|
ToolUsageErrorEvent,
|
|
ToolUsageFinishedEvent,
|
|
ToolUsageStartedEvent,
|
|
ToolValidateInputErrorEvent,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from crewai.agents.agent_builder.base_agent import BaseAgent
|
|
from crewai.lite_agent import LiteAgent
|
|
|
|
OPENAI_BIGGER_MODELS = [
|
|
"gpt-4",
|
|
"gpt-4o",
|
|
"o1-preview",
|
|
"o1-mini",
|
|
"o1",
|
|
"o3",
|
|
"o3-mini",
|
|
]
|
|
|
|
|
|
class ToolUsageErrorException(Exception):
|
|
"""Exception raised for errors in the tool usage."""
|
|
|
|
def __init__(self, message: str) -> None:
|
|
self.message = message
|
|
super().__init__(self.message)
|
|
|
|
|
|
class ToolUsage:
|
|
"""
|
|
Class that represents the usage of a tool by an agent.
|
|
|
|
Attributes:
|
|
task: Task being executed.
|
|
tools_handler: Tools handler that will manage the tool usage.
|
|
tools: List of tools available for the agent.
|
|
original_tools: Original tools available for the agent before being converted to BaseTool.
|
|
tools_description: Description of the tools available for the agent.
|
|
tools_names: Names of the tools available for the agent.
|
|
function_calling_llm: Language model to be used for the tool usage.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
tools_handler: Optional[ToolsHandler],
|
|
tools: List[CrewStructuredTool],
|
|
task: Optional[Task],
|
|
function_calling_llm: Any,
|
|
agent: Optional[Union["BaseAgent", "LiteAgent"]] = None,
|
|
action: Any = None,
|
|
fingerprint_context: Optional[Dict[str, str]] = None,
|
|
) -> None:
|
|
self._i18n: I18N = agent.i18n if agent else I18N()
|
|
self._printer: Printer = Printer()
|
|
self._telemetry: Telemetry = Telemetry()
|
|
self._run_attempts: int = 1
|
|
self._max_parsing_attempts: int = 3
|
|
self._remember_format_after_usages: int = 3
|
|
self.agent = agent
|
|
self.tools_description = render_text_description_and_args(tools)
|
|
self.tools_names = get_tool_names(tools)
|
|
self.tools_handler = tools_handler
|
|
self.tools = tools
|
|
self.task = task
|
|
self.action = action
|
|
self.function_calling_llm = function_calling_llm
|
|
self.fingerprint_context = fingerprint_context or {}
|
|
|
|
# Set the maximum parsing attempts for bigger models
|
|
if (
|
|
self.function_calling_llm
|
|
and self.function_calling_llm in OPENAI_BIGGER_MODELS
|
|
):
|
|
self._max_parsing_attempts = 2
|
|
self._remember_format_after_usages = 4
|
|
|
|
def parse_tool_calling(self, tool_string: str):
|
|
"""Parse the tool string and return the tool calling."""
|
|
return self._tool_calling(tool_string)
|
|
|
|
def use(
|
|
self, calling: Union[ToolCalling, InstructorToolCalling], tool_string: str
|
|
) -> str:
|
|
if isinstance(calling, ToolUsageErrorException):
|
|
error = calling.message
|
|
if self.agent and self.agent.verbose:
|
|
self._printer.print(content=f"\n\n{error}\n", color="red")
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
return error
|
|
|
|
try:
|
|
tool = self._select_tool(calling.tool_name)
|
|
except Exception as e:
|
|
error = getattr(e, "message", str(e))
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
if self.agent and self.agent.verbose:
|
|
self._printer.print(content=f"\n\n{error}\n", color="red")
|
|
return error
|
|
|
|
if (
|
|
isinstance(tool, CrewStructuredTool)
|
|
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
|
|
):
|
|
try:
|
|
result = self._use(tool_string=tool_string, tool=tool, calling=calling)
|
|
return result
|
|
|
|
except Exception as e:
|
|
error = getattr(e, "message", str(e))
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
if self.agent and self.agent.verbose:
|
|
self._printer.print(content=f"\n\n{error}\n", color="red")
|
|
return error
|
|
|
|
return f"{self._use(tool_string=tool_string, tool=tool, calling=calling)}"
|
|
|
|
def _use(
|
|
self,
|
|
tool_string: str,
|
|
tool: CrewStructuredTool,
|
|
calling: Union[ToolCalling, InstructorToolCalling],
|
|
) -> str:
|
|
if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
|
|
try:
|
|
result = self._i18n.errors("task_repeated_usage").format(
|
|
tool_names=self.tools_names
|
|
)
|
|
self._telemetry.tool_repeated_usage(
|
|
llm=self.function_calling_llm,
|
|
tool_name=tool.name,
|
|
attempts=self._run_attempts,
|
|
)
|
|
result = self._format_result(result=result) # type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None)
|
|
return result # type: ignore # Fix the return type of this function
|
|
|
|
except Exception:
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
|
|
if self.agent:
|
|
event_data = {
|
|
"agent_key": self.agent.key,
|
|
"agent_role": self.agent.role,
|
|
"tool_name": self.action.tool,
|
|
"tool_args": self.action.tool_input,
|
|
"tool_class": self.action.tool,
|
|
"agent": self.agent,
|
|
}
|
|
|
|
if self.agent.fingerprint:
|
|
event_data.update(self.agent.fingerprint)
|
|
|
|
crewai_event_bus.emit(self,ToolUsageStartedEvent(**event_data))
|
|
|
|
started_at = time.time()
|
|
from_cache = False
|
|
result = None # type: ignore
|
|
|
|
if self.tools_handler and self.tools_handler.cache:
|
|
result = self.tools_handler.cache.read(
|
|
tool=calling.tool_name, input=calling.arguments
|
|
) # type: ignore
|
|
from_cache = result is not None
|
|
|
|
available_tool = next(
|
|
(
|
|
available_tool
|
|
for available_tool in self.tools
|
|
if available_tool.name == tool.name
|
|
),
|
|
None,
|
|
)
|
|
|
|
if result is None:
|
|
try:
|
|
if calling.tool_name in [
|
|
"Delegate work to coworker",
|
|
"Ask question to coworker",
|
|
]:
|
|
coworker = (
|
|
calling.arguments.get("coworker") if calling.arguments else None
|
|
)
|
|
if self.task:
|
|
self.task.increment_delegations(coworker)
|
|
|
|
if calling.arguments:
|
|
try:
|
|
acceptable_args = tool.args_schema.model_json_schema()[
|
|
"properties"
|
|
].keys() # type: ignore
|
|
arguments = {
|
|
k: v
|
|
for k, v in calling.arguments.items()
|
|
if k in acceptable_args
|
|
}
|
|
# Add fingerprint metadata if available
|
|
arguments = self._add_fingerprint_metadata(arguments)
|
|
result = tool.invoke(input=arguments)
|
|
except Exception:
|
|
arguments = calling.arguments
|
|
# Add fingerprint metadata if available
|
|
arguments = self._add_fingerprint_metadata(arguments)
|
|
result = tool.invoke(input=arguments)
|
|
else:
|
|
# Add fingerprint metadata even to empty arguments
|
|
arguments = self._add_fingerprint_metadata({})
|
|
result = tool.invoke(input=arguments)
|
|
except Exception as e:
|
|
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
|
|
self._run_attempts += 1
|
|
if self._run_attempts > self._max_parsing_attempts:
|
|
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
|
|
error_message = self._i18n.errors("tool_usage_exception").format(
|
|
error=e, tool=tool.name, tool_inputs=tool.description
|
|
)
|
|
error = ToolUsageErrorException(
|
|
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
|
|
).message
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
if self.agent and self.agent.verbose:
|
|
self._printer.print(
|
|
content=f"\n\n{error_message}\n", color="red"
|
|
)
|
|
return error # type: ignore # No return value expected
|
|
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
return self.use(calling=calling, tool_string=tool_string) # type: ignore # No return value expected
|
|
|
|
if self.tools_handler:
|
|
should_cache = True
|
|
if (
|
|
hasattr(available_tool, "cache_function")
|
|
and available_tool.cache_function # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
|
|
):
|
|
should_cache = available_tool.cache_function( # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
|
|
calling.arguments, result
|
|
)
|
|
|
|
self.tools_handler.on_tool_use(
|
|
calling=calling, output=result, should_cache=should_cache
|
|
)
|
|
self._telemetry.tool_usage(
|
|
llm=self.function_calling_llm,
|
|
tool_name=tool.name,
|
|
attempts=self._run_attempts,
|
|
)
|
|
result = self._format_result(result=result) # type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None)
|
|
data = {
|
|
"result": result,
|
|
"tool_name": tool.name,
|
|
"tool_args": calling.arguments,
|
|
}
|
|
|
|
self.on_tool_use_finished(
|
|
tool=tool,
|
|
tool_calling=calling,
|
|
from_cache=from_cache,
|
|
started_at=started_at,
|
|
result=result,
|
|
)
|
|
|
|
if (
|
|
hasattr(available_tool, "result_as_answer")
|
|
and available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
|
|
):
|
|
result_as_answer = available_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "result_as_answer"
|
|
data["result_as_answer"] = result_as_answer # type: ignore
|
|
|
|
if self.agent and hasattr(self.agent, "tools_results"):
|
|
self.agent.tools_results.append(data)
|
|
|
|
return result
|
|
|
|
def _format_result(self, result: Any) -> str:
|
|
if self.task:
|
|
self.task.used_tools += 1
|
|
if self._should_remember_format():
|
|
result = self._remember_format(result=result)
|
|
return str(result)
|
|
|
|
def _should_remember_format(self) -> bool:
|
|
if self.task:
|
|
return self.task.used_tools % self._remember_format_after_usages == 0
|
|
return False
|
|
|
|
def _remember_format(self, result: str) -> str:
|
|
result = str(result)
|
|
result += "\n\n" + self._i18n.slice("tools").format(
|
|
tools=self.tools_description, tool_names=self.tools_names
|
|
)
|
|
return result
|
|
|
|
def _check_tool_repeated_usage(
|
|
self, calling: Union[ToolCalling, InstructorToolCalling]
|
|
) -> bool:
|
|
if not self.tools_handler:
|
|
return False
|
|
if last_tool_usage := self.tools_handler.last_used_tool:
|
|
return (calling.tool_name == last_tool_usage.tool_name) and (
|
|
calling.arguments == last_tool_usage.arguments
|
|
)
|
|
return False
|
|
|
|
def _select_tool(self, tool_name: str) -> Any:
|
|
order_tools = sorted(
|
|
self.tools,
|
|
key=lambda tool: SequenceMatcher(
|
|
None, tool.name.lower().strip(), tool_name.lower().strip()
|
|
).ratio(),
|
|
reverse=True,
|
|
)
|
|
for tool in order_tools:
|
|
if (
|
|
tool.name.lower().strip() == tool_name.lower().strip()
|
|
or SequenceMatcher(
|
|
None, tool.name.lower().strip(), tool_name.lower().strip()
|
|
).ratio()
|
|
> 0.85
|
|
):
|
|
return tool
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
tool_selection_data: Dict[str, Any] = {
|
|
"agent_key": getattr(self.agent, "key", None) if self.agent else None,
|
|
"agent_role": getattr(self.agent, "role", None) if self.agent else None,
|
|
"tool_name": tool_name,
|
|
"tool_args": {},
|
|
"tool_class": self.tools_description,
|
|
}
|
|
if tool_name and tool_name != "":
|
|
error = f"Action '{tool_name}' don't exist, these are the only available Actions:\n{self.tools_description}"
|
|
crewai_event_bus.emit(
|
|
self,
|
|
ToolSelectionErrorEvent(
|
|
**tool_selection_data,
|
|
error=error,
|
|
),
|
|
)
|
|
raise Exception(error)
|
|
else:
|
|
error = f"I forgot the Action name, these are the only available Actions: {self.tools_description}"
|
|
crewai_event_bus.emit(
|
|
self,
|
|
ToolSelectionErrorEvent(
|
|
**tool_selection_data,
|
|
error=error,
|
|
),
|
|
)
|
|
raise Exception(error)
|
|
|
|
def _render(self) -> str:
|
|
"""Render the tool name and description in plain text."""
|
|
descriptions = []
|
|
for tool in self.tools:
|
|
descriptions.append(tool.description)
|
|
return "\n--\n".join(descriptions)
|
|
|
|
def _function_calling(
|
|
self, tool_string: str
|
|
) -> Union[ToolCalling, InstructorToolCalling]:
|
|
model = (
|
|
InstructorToolCalling
|
|
if self.function_calling_llm.supports_function_calling()
|
|
else ToolCalling
|
|
)
|
|
converter = Converter(
|
|
text=f"Only tools available:\n###\n{self._render()}\n\nReturn a valid schema for the tool, the tool name must be exactly equal one of the options, use this text to inform the valid output schema:\n\n### TEXT \n{tool_string}",
|
|
llm=self.function_calling_llm,
|
|
model=model,
|
|
instructions=dedent(
|
|
"""\
|
|
The schema should have the following structure, only two keys:
|
|
- tool_name: str
|
|
- arguments: dict (always a dictionary, with all arguments being passed)
|
|
|
|
Example:
|
|
{"tool_name": "tool name", "arguments": {"arg_name1": "value", "arg_name2": 2}}""",
|
|
),
|
|
max_attempts=1,
|
|
)
|
|
tool_object = converter.to_pydantic()
|
|
if not isinstance(tool_object, (ToolCalling, InstructorToolCalling)):
|
|
raise ToolUsageErrorException("Failed to parse tool calling")
|
|
|
|
return tool_object
|
|
|
|
def _original_tool_calling(
|
|
self, tool_string: str, raise_error: bool = False
|
|
) -> Union[ToolCalling, InstructorToolCalling, ToolUsageErrorException]:
|
|
tool_name = self.action.tool
|
|
tool = self._select_tool(tool_name)
|
|
try:
|
|
arguments = self._validate_tool_input(self.action.tool_input)
|
|
|
|
except Exception:
|
|
if raise_error:
|
|
raise
|
|
else:
|
|
return ToolUsageErrorException(
|
|
f"{self._i18n.errors('tool_arguments_error')}"
|
|
)
|
|
|
|
if not isinstance(arguments, dict):
|
|
if raise_error:
|
|
raise
|
|
else:
|
|
return ToolUsageErrorException(
|
|
f"{self._i18n.errors('tool_arguments_error')}"
|
|
)
|
|
|
|
return ToolCalling(
|
|
tool_name=tool.name,
|
|
arguments=arguments,
|
|
)
|
|
|
|
def _tool_calling(
|
|
self, tool_string: str
|
|
) -> Union[ToolCalling, InstructorToolCalling, ToolUsageErrorException]:
|
|
try:
|
|
try:
|
|
return self._original_tool_calling(tool_string, raise_error=True)
|
|
except Exception:
|
|
if self.function_calling_llm:
|
|
return self._function_calling(tool_string)
|
|
else:
|
|
return self._original_tool_calling(tool_string)
|
|
except Exception as e:
|
|
self._run_attempts += 1
|
|
if self._run_attempts > self._max_parsing_attempts:
|
|
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
|
|
if self.task:
|
|
self.task.increment_tools_errors()
|
|
if self.agent and self.agent.verbose:
|
|
self._printer.print(content=f"\n\n{e}\n", color="red")
|
|
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
|
|
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
|
|
)
|
|
return self._tool_calling(tool_string)
|
|
|
|
def _validate_tool_input(self, tool_input: Optional[str]) -> Dict[str, Any]:
|
|
if tool_input is None:
|
|
return {}
|
|
|
|
if not isinstance(tool_input, str) or not tool_input.strip():
|
|
raise Exception(
|
|
"Tool input must be a valid dictionary in JSON or Python literal format"
|
|
)
|
|
|
|
# Attempt 1: Parse as JSON
|
|
try:
|
|
arguments = json.loads(tool_input)
|
|
if isinstance(arguments, dict):
|
|
return arguments
|
|
except (JSONDecodeError, TypeError):
|
|
pass # Continue to the next parsing attempt
|
|
|
|
# Attempt 2: Parse as Python literal
|
|
try:
|
|
arguments = ast.literal_eval(tool_input)
|
|
if isinstance(arguments, dict):
|
|
return arguments
|
|
except (ValueError, SyntaxError):
|
|
repaired_input = repair_json(tool_input)
|
|
pass # Continue to the next parsing attempt
|
|
|
|
# Attempt 3: Parse as JSON5
|
|
try:
|
|
arguments = json5.loads(tool_input)
|
|
if isinstance(arguments, dict):
|
|
return arguments
|
|
except (JSONDecodeError, ValueError, TypeError):
|
|
pass # Continue to the next parsing attempt
|
|
|
|
# Attempt 4: Repair JSON
|
|
try:
|
|
repaired_input = str(repair_json(tool_input, skip_json_loads=True))
|
|
self._printer.print(
|
|
content=f"Repaired JSON: {repaired_input}", color="blue"
|
|
)
|
|
arguments = json.loads(repaired_input)
|
|
if isinstance(arguments, dict):
|
|
return arguments
|
|
except Exception as e:
|
|
error = f"Failed to repair JSON: {e}"
|
|
self._printer.print(content=error, color="red")
|
|
|
|
error_message = (
|
|
"Tool input must be a valid dictionary in JSON or Python literal format"
|
|
)
|
|
self._emit_validate_input_error(error_message)
|
|
# If all parsing attempts fail, raise an error
|
|
raise Exception(error_message)
|
|
|
|
def _emit_validate_input_error(self, final_error: str):
|
|
tool_selection_data = {
|
|
"agent_key": getattr(self.agent, "key", None) if self.agent else None,
|
|
"agent_role": getattr(self.agent, "role", None) if self.agent else None,
|
|
"tool_name": self.action.tool,
|
|
"tool_args": str(self.action.tool_input),
|
|
"tool_class": self.__class__.__name__,
|
|
"agent": self.agent, # Adding agent for fingerprint extraction
|
|
}
|
|
|
|
# Include fingerprint context if available
|
|
if self.fingerprint_context:
|
|
tool_selection_data.update(self.fingerprint_context)
|
|
|
|
crewai_event_bus.emit(
|
|
self,
|
|
ToolValidateInputErrorEvent(**tool_selection_data, error=final_error),
|
|
)
|
|
|
|
def on_tool_error(
|
|
self,
|
|
tool: Any,
|
|
tool_calling: Union[ToolCalling, InstructorToolCalling],
|
|
e: Exception,
|
|
) -> None:
|
|
event_data = self._prepare_event_data(tool, tool_calling)
|
|
crewai_event_bus.emit(self, ToolUsageErrorEvent(**{**event_data, "error": e}))
|
|
|
|
def on_tool_use_finished(
|
|
self,
|
|
tool: Any,
|
|
tool_calling: Union[ToolCalling, InstructorToolCalling],
|
|
from_cache: bool,
|
|
started_at: float,
|
|
result: Any,
|
|
) -> None:
|
|
finished_at = time.time()
|
|
event_data = self._prepare_event_data(tool, tool_calling)
|
|
event_data.update(
|
|
{
|
|
"started_at": datetime.datetime.fromtimestamp(started_at),
|
|
"finished_at": datetime.datetime.fromtimestamp(finished_at),
|
|
"from_cache": from_cache,
|
|
"output": result,
|
|
}
|
|
)
|
|
crewai_event_bus.emit(self, ToolUsageFinishedEvent(**event_data))
|
|
|
|
def _prepare_event_data(
|
|
self, tool: Any, tool_calling: Union[ToolCalling, InstructorToolCalling]
|
|
) -> dict:
|
|
event_data = {
|
|
"run_attempts": self._run_attempts,
|
|
"delegations": self.task.delegations if self.task else 0,
|
|
"tool_name": tool.name,
|
|
"tool_args": tool_calling.arguments,
|
|
"tool_class": tool.__class__.__name__,
|
|
"agent_key": (
|
|
getattr(self.agent, "key", "unknown") if self.agent else "unknown"
|
|
),
|
|
"agent_role": (
|
|
getattr(self.agent, "_original_role", None)
|
|
or getattr(self.agent, "role", "unknown")
|
|
if self.agent
|
|
else "unknown"
|
|
),
|
|
}
|
|
|
|
# Include fingerprint context if available
|
|
if self.fingerprint_context:
|
|
event_data.update(self.fingerprint_context)
|
|
|
|
return event_data
|
|
|
|
def _add_fingerprint_metadata(self, arguments: dict) -> dict:
|
|
"""Add fingerprint metadata to tool arguments if available.
|
|
|
|
Args:
|
|
arguments: The original tool arguments
|
|
|
|
Returns:
|
|
Updated arguments dictionary with fingerprint metadata
|
|
"""
|
|
# Create a shallow copy to avoid modifying the original
|
|
arguments = arguments.copy()
|
|
|
|
# Add security metadata under a designated key
|
|
if "security_context" not in arguments:
|
|
arguments["security_context"] = {}
|
|
|
|
security_context = arguments["security_context"]
|
|
|
|
# Add agent fingerprint if available
|
|
if self.agent and hasattr(self.agent, "security_config"):
|
|
security_config = getattr(self.agent, "security_config", None)
|
|
if security_config and hasattr(security_config, "fingerprint"):
|
|
try:
|
|
security_context["agent_fingerprint"] = (
|
|
security_config.fingerprint.to_dict()
|
|
)
|
|
except AttributeError:
|
|
pass
|
|
|
|
# Add task fingerprint if available
|
|
if self.task and hasattr(self.task, "security_config"):
|
|
security_config = getattr(self.task, "security_config", None)
|
|
if security_config and hasattr(security_config, "fingerprint"):
|
|
try:
|
|
security_context["task_fingerprint"] = (
|
|
security_config.fingerprint.to_dict()
|
|
)
|
|
except AttributeError:
|
|
pass
|
|
|
|
return arguments
|