Crewating a tool output parser

This commit is contained in:
João Moura
2024-02-12 14:24:36 -08:00
parent aef083ee53
commit 46f7dc205e
7 changed files with 174 additions and 48 deletions

View File

@@ -1,12 +1,15 @@
from typing import Any, List
from typing import Any, List, Union
from langchain.output_parsers import PydanticOutputParser
import instructor
from langchain.prompts import PromptTemplate
from langchain_core.tools import BaseTool
from langchain_openai import ChatOpenAI
from openai import OpenAI
from crewai.agents.tools_handler import ToolsHandler
from crewai.telemtry import Telemetry
from crewai.tools.tool_calling import ToolCalling
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.tools.tool_output_parser import ToolOutputParser
from crewai.utilities import I18N, Printer
@@ -44,7 +47,7 @@ class ToolUsage:
self._printer: Printer = Printer()
self._telemetry: Telemetry = Telemetry()
self._run_attempts: int = 1
self._max_parsing_attempts: int = 3
self._max_parsing_attempts: int = 2
self._remeber_format_after_usages: int = 3
self.tools_description = tools_description
self.tools_names = tools_names
@@ -62,38 +65,54 @@ class ToolUsage:
tool = self._select_tool(calling.function_name)
return self._use(tool_string=tool_string, tool=tool, calling=calling)
def _use(self, tool_string: str, tool: BaseTool, calling: ToolCalling) -> None:
try:
if self._check_tool_repeated_usage(calling=calling):
def _use(
self,
tool_string: str,
tool: BaseTool,
calling: Union[ToolCalling, InstructorToolCalling],
) -> None:
if self._check_tool_repeated_usage(calling=calling):
try:
result = self._i18n.errors("task_repeated_usage").format(
tool=calling.function_name, tool_input=calling.arguments
tool=calling.function_name,
tool_input=", ".join(calling.arguments.values()),
)
else:
self.tools_handler.on_tool_start(calling=calling)
result = self.tools_handler.cache.read(
tool=calling.function_name, input=calling.arguments
self._printer.print(content=f"\n\n{result}\n", color="yellow")
self._telemetry.tool_repeated_usage(
llm=self.llm, tool_name=tool.name, attempts=self._run_attempts
)
result = self._format_result(result=result)
return result
except Exception:
pass
if not result:
result = tool._run(**calling.arguments)
self.tools_handler.on_tool_end(calling=calling, output=result)
self.tools_handler.on_tool_start(calling=calling)
self._printer.print(content=f"\n\n{result}\n", color="yellow")
self._telemetry.tool_usage(
llm=self.llm, tool_name=tool.name, attempts=self._run_attempts
)
result = self.tools_handler.cache.read(
tool=calling.function_name, input=calling.arguments
)
result = self._format_result(result=result)
return result
except Exception:
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.llm)
return ToolUsageErrorException(
self._i18n.errors("tool_usage_error")
).message
return self.use(tool_string=tool_string)
if not result:
try:
result = tool._run(**calling.arguments)
except Exception as e:
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.llm)
return ToolUsageErrorException(
self._i18n.errors("tool_usage_exception").format(error=e)
).message
return self.use(tool_string=tool_string)
self.tools_handler.on_tool_end(calling=calling, output=result)
self._printer.print(content=f"\n\n{result}\n", color="yellow")
self._telemetry.tool_usage(
llm=self.llm, tool_name=tool.name, attempts=self._run_attempts
)
result = self._format_result(result=result)
return result
def _format_result(self, result: Any) -> None:
self.task.used_tools += 1
@@ -111,13 +130,15 @@ class ToolUsage:
)
return result
def _check_tool_repeated_usage(self, calling: ToolCalling) -> None:
def _check_tool_repeated_usage(
self, calling: Union[ToolCalling, InstructorToolCalling]
) -> None:
if last_tool_usage := self.tools_handler.last_used_tool:
return calling == last_tool_usage
def _select_tool(self, tool_name: str) -> BaseTool:
for tool in self.tools:
if tool.name == tool_name:
if tool.name.lower() == tool_name.lower():
return tool
raise Exception(f"Tool '{tool_name}' not found.")
@@ -132,27 +153,63 @@ class ToolUsage:
descriptions.append(
"\n".join(
[
f"Funtion Name: {tool.name}",
f"Funtion attributes: {args}",
f"Tool Name: {tool.name.lower()}",
f"Description: {tool.description}",
f"Tool Arguments: {args}",
]
)
)
return "\n--\n".join(descriptions)
def _tool_calling(self, tool_string: str) -> ToolCalling:
def _tool_calling(
self, tool_string: str
) -> Union[ToolCalling, InstructorToolCalling]:
try:
parser = PydanticOutputParser(pydantic_object=ToolCalling)
prompt = PromptTemplate(
template="Return a valid schema for the one tool you must use with its arguments and values.\n\nTools available:\n\n{available_tools}\n\nUse this text to inform a valid ouput schema:\n{tool_string}\n\n{format_instructions}\n```",
input_variables=["tool_string"],
partial_variables={
"available_tools": self._render(),
"format_instructions": parser.get_format_instructions(),
},
tool_string = tool_string.replace(
"Thought: Do I need to use a tool? Yes", ""
)
chain = prompt | self.llm | parser
calling = chain.invoke({"tool_string": tool_string})
tool_string = tool_string.replace("Action:", "Tool Name:")
tool_string = tool_string.replace("Action Input:", "Tool Arguments:")
if (isinstance(self.llm, ChatOpenAI)) and (
self.llm.openai_api_base == None
):
client = instructor.patch(
OpenAI(
base_url=self.llm.openai_api_base,
api_key=self.llm.openai_api_key,
),
mode=instructor.Mode.JSON,
)
calling = client.chat.completions.create(
model=self.llm.model_name,
messages=[
{
"role": "user",
"content": f"Tools available:\n\n{self._render()}\n\nReturn a valid schema for the tool, use this text to inform a valid ouput schema:\n{tool_string}```",
}
],
response_model=InstructorToolCalling,
)
else:
parser = ToolOutputParser(pydantic_object=ToolCalling)
prompt = PromptTemplate(
template="Tools available:\n\n{available_tools}\n\nReturn a valid schema for the tool, use this text to inform a valid ouput schema:\n{tool_string}\n\n{format_instructions}\n```",
input_variables=["tool_string"],
partial_variables={
"available_tools": self._render(),
"format_instructions": """
The schema should have the following structure, only two key:
- function_name: str
- arguments: dict (with all arguments being passed)
Example:
{"function_name": "function_name", "arguments": {"arg_name1": "value", "arg_name2": 2}}
""",
},
)
chain = prompt | self.llm | parser
calling = chain.invoke({"tool_string": tool_string})
except Exception:
self._run_attempts += 1