Revamping tool usage

This commit is contained in:
João Moura
2024-02-10 10:36:34 -08:00
parent d0b0a33be3
commit 00206a62ab
22 changed files with 1233 additions and 584 deletions

View File

@@ -3,9 +3,9 @@ from typing import Any, List, Optional
from langchain.agents.agent import RunnableAgent
from langchain.agents.format_scratchpad import format_log_to_str
from langchain.agents.output_parsers import ReActSingleInputOutputParser
from langchain.memory import ConversationSummaryMemory
from langchain.tools.render import render_text_description
from langchain_core.runnables.config import RunnableConfig
from langchain_openai import ChatOpenAI
from pydantic import (
UUID4,
@@ -19,12 +19,7 @@ from pydantic import (
)
from pydantic_core import PydanticCustomError
from crewai.agents import (
CacheHandler,
CrewAgentExecutor,
CrewAgentOutputParser,
ToolsHandler,
)
from crewai.agents import CacheHandler, CrewAgentExecutor, ToolsHandler
from crewai.utilities import I18N, Logger, Prompts, RPMController
@@ -158,8 +153,7 @@ class Agent(BaseModel):
"input": task,
"tool_names": self.__tools_names(tools),
"tools": render_text_description(tools),
},
RunnableConfig(callbacks=[self.tools_handler]),
}
)["output"]
if self.max_rpm:
@@ -200,12 +194,14 @@ class Agent(BaseModel):
"agent_scratchpad": lambda x: format_log_to_str(x["intermediate_steps"]),
}
executor_args = {
"llm": self.llm,
"i18n": self.i18n,
"tools": self.tools,
"verbose": self.verbose,
"handle_parsing_errors": True,
"max_iterations": self.max_iter,
"step_callback": self.step_callback,
"tools_handler": self.tools_handler,
}
if self._rpm_controller:
@@ -231,14 +227,7 @@ class Agent(BaseModel):
bind = self.llm.bind(stop=[self.i18n.slice("observation")])
inner_agent = (
agent_args
| execution_prompt
| bind
| CrewAgentOutputParser(
tools_handler=self.tools_handler,
cache=self.cache_handler,
i18n=self.i18n,
)
agent_args | execution_prompt | bind | ReActSingleInputOutputParser()
)
self.agent_executor = CrewAgentExecutor(
agent=RunnableAgent(runnable=inner_agent), **executor_args

View File

@@ -1,4 +1,3 @@
from .cache.cache_handler import CacheHandler
from .executor import CrewAgentExecutor
from .output_parser import CrewAgentOutputParser
from .tools_handler import ToolsHandler

View File

@@ -1,2 +1 @@
from .cache_handler import CacheHandler
from .cache_hit import CacheHit

View File

@@ -10,9 +10,7 @@ class CacheHandler:
self._cache = {}
def add(self, tool, input, output):
input = input.strip()
self._cache[f"{tool}-{input}"] = output
def read(self, tool, input) -> Optional[str]:
input = input.strip()
return self._cache.get(f"{tool}-{input}")

View File

@@ -1,18 +0,0 @@
from typing import Any
from pydantic import BaseModel, Field
from .cache_handler import CacheHandler
class CacheHit(BaseModel):
"""Cache Hit Object."""
class Config:
arbitrary_types_allowed = True
# Making it Any instead of AgentAction to avoind
# pydantic v1 vs v2 incompatibility, langchain should
# soon be updated to pydantic v2
action: Any = Field(description="Action taken")
cache: CacheHandler = Field(description="Cache Handler for the tool")

View File

@@ -1,30 +0,0 @@
from langchain_core.exceptions import OutputParserException
from crewai.utilities import I18N
class TaskRepeatedUsageException(OutputParserException):
"""Exception raised when a task is used twice in a roll."""
i18n: I18N = I18N()
error: str = "TaskRepeatedUsageException"
message: str
def __init__(self, i18n: I18N, tool: str, tool_input: str, text: str):
self.i18n = i18n
self.text = text
self.tool = tool
self.tool_input = tool_input
self.message = self.i18n.errors("task_repeated_usage").format(
tool=tool, tool_input=tool_input
)
super().__init__(
error=self.error,
observation=self.message,
send_to_llm=True,
llm_output=self.text,
)
def __str__(self):
return self.message

View File

@@ -10,16 +10,19 @@ from langchain_core.exceptions import OutputParserException
from langchain_core.pydantic_v1 import root_validator
from langchain_core.tools import BaseTool
from langchain_core.utils.input import get_color_mapping
from pydantic import InstanceOf
from crewai.agents.cache.cache_hit import CacheHit
from crewai.tools.cache_tools import CacheTools
from crewai.agents.tools_handler import ToolsHandler
from crewai.tools.tool_usage import ToolUsage
from crewai.utilities import I18N
class CrewAgentExecutor(AgentExecutor):
i18n: I18N = I18N()
llm: Any = None
iterations: int = 0
request_within_rpm_limit: Any = None
tools_handler: InstanceOf[ToolsHandler] = None
max_iterations: Optional[int] = 15
force_answer_max_iterations: Optional[int] = None
step_callback: Optional[Any] = None
@@ -32,11 +35,6 @@ class CrewAgentExecutor(AgentExecutor):
def _should_force_answer(self) -> bool:
return True if self.iterations == self.force_answer_max_iterations else False
def _force_answer(self, output: AgentAction):
return AgentStep(
action=output, observation=self.i18n.errors("force_final_answer")
)
def _call(
self,
inputs: Dict[str, str],
@@ -110,16 +108,17 @@ class CrewAgentExecutor(AgentExecutor):
callbacks=run_manager.get_child() if run_manager else None,
**inputs,
)
if self._should_force_answer():
if isinstance(output, AgentAction) or isinstance(output, AgentFinish):
output = output
elif isinstance(output, CacheHit):
output = output.action
else:
raise ValueError(
f"Unexpected output type from agent: {type(output)}"
)
yield self._force_answer(output)
yield AgentStep(
action=output, observation=self.i18n.errors("force_final_answer")
)
return
except OutputParserException as e:
@@ -160,7 +159,9 @@ class CrewAgentExecutor(AgentExecutor):
)
if self._should_force_answer():
yield self._force_answer(output)
yield AgentStep(
action=output, observation=self.i18n.errors("force_final_answer")
)
return
yield AgentStep(action=output, observation=observation)
@@ -171,17 +172,6 @@ class CrewAgentExecutor(AgentExecutor):
yield output
return
# Override tool usage to use CacheTools
if isinstance(output, CacheHit):
cache = output.cache
action = output.action
tool = CacheTools(cache_handler=cache).tool()
output = action.copy()
output.tool_input = f"tool:{action.tool}|input:{action.tool_input}"
output.tool = tool.name
name_to_tool_map[tool.name] = tool
color_mapping[tool.name] = color_mapping[action.tool]
actions: List[AgentAction]
actions = [output] if isinstance(output, AgentAction) else output
yield from actions
@@ -192,18 +182,13 @@ class CrewAgentExecutor(AgentExecutor):
if agent_action.tool in name_to_tool_map:
tool = name_to_tool_map[agent_action.tool]
return_direct = tool.return_direct
color = color_mapping[agent_action.tool]
color_mapping[agent_action.tool]
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
if return_direct:
tool_run_kwargs["llm_prefix"] = ""
# We then call the tool on the tool input to get an observation
observation = tool.run(
agent_action.tool_input,
verbose=self.verbose,
color=color,
callbacks=run_manager.get_child() if run_manager else None,
**tool_run_kwargs,
)
observation = ToolUsage(
tools_handler=self.tools_handler, tools=self.tools, llm=self.llm
).use(agent_action.log)
else:
tool_run_kwargs = self.agent.tool_run_logging_kwargs()
observation = InvalidTool().run(

View File

@@ -1,79 +0,0 @@
import re
from typing import Union
from langchain.agents.output_parsers import ReActSingleInputOutputParser
from langchain_core.agents import AgentAction, AgentFinish
from crewai.agents.cache import CacheHandler, CacheHit
from crewai.agents.exceptions import TaskRepeatedUsageException
from crewai.agents.tools_handler import ToolsHandler
from crewai.utilities import I18N
FINAL_ANSWER_ACTION = "Final Answer:"
FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE = (
"Parsing LLM output produced both a final answer and a parse-able action:"
)
class CrewAgentOutputParser(ReActSingleInputOutputParser):
"""Parses ReAct-style LLM calls that have a single tool input.
Expects output to be in one of two formats.
If the output signals that an action should be taken,
should be in the below format. This will result in an AgentAction
being returned.
```
Thought: agent thought here
Action: search
Action Input: what is the temperature in SF?
```
If the output signals that a final answer should be given,
should be in the below format. This will result in an AgentFinish
being returned.
```
Thought: agent thought here
Final Answer: The temperature is 100 degrees
```
It also prevents tools from being reused in a roll.
"""
class Config:
arbitrary_types_allowed = True
tools_handler: ToolsHandler
cache: CacheHandler
i18n: I18N
def parse(self, text: str) -> Union[AgentAction, AgentFinish, CacheHit]:
regex = (
r"Action\s*\d*\s*:[\s]*(.*?)[\s]*Action\s*\d*\s*Input\s*\d*\s*:[\s]*(.*)"
)
if action_match := re.search(regex, text, re.DOTALL):
action = action_match.group(1).strip()
action_input = action_match.group(2)
tool_input = action_input.strip(" ")
tool_input = tool_input.strip('"')
if last_tool_usage := self.tools_handler.last_used_tool:
usage = {
"tool": action,
"input": tool_input,
}
if usage == last_tool_usage:
raise TaskRepeatedUsageException(
text=text,
tool=action,
tool_input=tool_input,
i18n=self.i18n,
)
if self.cache.read(action, tool_input):
action = AgentAction(action, tool_input, text)
return CacheHit(action=action, cache=self.cache)
return super().parse(text)

View File

@@ -1,44 +1,30 @@
from typing import Any, Dict
from langchain.callbacks.base import BaseCallbackHandler
from typing import Any
from ..tools.cache_tools import CacheTools
from ..tools.tool_calling import ToolCalling
from .cache.cache_handler import CacheHandler
class ToolsHandler(BaseCallbackHandler):
class ToolsHandler:
"""Callback handler for tool usage."""
last_used_tool: Dict[str, Any] = {}
last_used_tool: ToolCalling = {}
cache: CacheHandler
def __init__(self, cache: CacheHandler, **kwargs: Any):
def __init__(self, cache: CacheHandler):
"""Initialize the callback handler."""
self.cache = cache
super().__init__(**kwargs)
self.last_used_tool = {}
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> Any:
def on_tool_start(self, calling: ToolCalling) -> Any:
"""Run when tool starts running."""
name = serialized.get("name")
if name not in ["invalid_tool", "_Exception"]:
tools_usage = {
"tool": name,
"input": input_str,
}
self.last_used_tool = tools_usage
self.last_used_tool = calling
def on_tool_end(self, output: str, **kwargs: Any) -> Any:
def on_tool_end(self, calling: ToolCalling, output: str) -> Any:
"""Run when tool ends running."""
if (
"is not a valid tool" not in output
and "Invalid or incomplete response" not in output
and "Invalid Format" not in output
):
if self.last_used_tool["tool"] != CacheTools().name:
self.cache.add(
tool=self.last_used_tool["tool"],
input=self.last_used_tool["input"],
output=output,
)
if self.last_used_tool.function_name != CacheTools().name:
self.cache.add(
tool=calling.function_name,
input=calling.arguments,
output=output,
)

View File

@@ -107,6 +107,7 @@ class Crew(BaseModel):
self._logger = Logger(self.verbose)
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self._telemetry.crew_creation(self)
return self

View File

@@ -2,6 +2,7 @@ import json
import os
import platform
import socket
from typing import Any
import pkg_resources
from opentelemetry import trace
@@ -42,16 +43,18 @@ class Telemetry:
try:
telemetry_endpoint = "http://telemetry.crewai.com:4318"
self.resource = Resource(attributes={SERVICE_NAME: "crewAI-telemetry"})
provider = TracerProvider(resource=self.resource)
self.provider = TracerProvider(resource=self.resource)
processor = BatchSpanProcessor(
OTLPSpanExporter(endpoint=f"{telemetry_endpoint}/v1/traces")
)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
self.provider.add_span_processor(processor)
self.ready = True
except Exception:
pass
def set_tracer(self):
trace.set_tracer_provider(self.provider)
def crew_creation(self, crew):
"""Records the creation of a crew."""
if self.ready:
@@ -116,6 +119,36 @@ class Telemetry:
except Exception:
pass
def tool_usage(self, llm: Any, tool_name: str, attempts: int):
"""Records the usage of a tool by an agent."""
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage")
self._add_attribute(span, "tool_name", tool_name)
self._add_attribute(span, "attempts", attempts)
self._add_attribute(
span, "llm", json.dumps(self._safe_llm_attributes(llm))
)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def tool_usage_error(self, llm: Any):
"""Records the usage of a tool by an agent."""
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage Error")
self._add_attribute(
span, "llm", json.dumps(self._safe_llm_attributes(llm))
)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def crew_execution_span(self, crew):
"""Records the complete execution of a crew.
This is only collected if the user has opted-in to share the crew.

View File

@@ -1,6 +1,6 @@
from typing import List
from langchain.tools import Tool
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
from crewai.agent import Agent
@@ -15,14 +15,14 @@ class AgentTools(BaseModel):
def tools(self):
return [
Tool.from_function(
StructuredTool.from_function(
func=self.delegate_work,
name="Delegate work to co-worker",
description=self.i18n.tools("delegate_work").format(
coworkers=", ".join([agent.role for agent in self.agents])
),
),
Tool.from_function(
StructuredTool.from_function(
func=self.ask_question,
name="Ask question to co-worker",
description=self.i18n.tools("ask_question").format(
@@ -31,24 +31,16 @@ class AgentTools(BaseModel):
),
]
def delegate_work(self, command):
def delegate_work(self, coworker: str, task: str, context: str):
"""Useful to delegate a specific task to a coworker."""
return self._execute(command)
return self._execute(coworker, task, context)
def ask_question(self, command):
def ask_question(self, coworker: str, question: str, context: str):
"""Useful to ask a question, opinion or take from a coworker."""
return self._execute(command)
return self._execute(coworker, question, context)
def _execute(self, command):
def _execute(self, agent, task, context):
"""Execute the command."""
try:
agent, task, context = command.split("|")
except ValueError:
return self.i18n.errors("agent_tool_missing_param")
if not agent or not task or not context:
return self.i18n.errors("agent_tool_missing_param")
agent = [
available_agent
for available_agent in self.agents

View File

@@ -1,4 +1,4 @@
from langchain.tools import Tool
from langchain.tools import StructuredTool
from pydantic import BaseModel, ConfigDict, Field
from crewai.agents.cache import CacheHandler
@@ -15,7 +15,7 @@ class CacheTools(BaseModel):
)
def tool(self):
return Tool.from_function(
return StructuredTool.from_function(
func=self.hit_cache,
name=self.name,
description="Reads directly from the cache",

View File

@@ -0,0 +1,12 @@
from typing import Any, Dict
from pydantic.v1 import BaseModel, Field
class ToolCalling(BaseModel):
function_name: str = Field(
..., description="The name of the function to be called."
)
arguments: Dict[str, Any] = Field(
..., description="A dictinary of arguments to be passed to the function."
)

View File

@@ -0,0 +1,112 @@
from typing import Any, List
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
from langchain_core.tools import BaseTool
from crewai.agents.tools_handler import ToolsHandler
from crewai.telemtry import Telemetry
from crewai.tools.tool_calling import ToolCalling
from crewai.utilities import I18N, Printer
class ToolUsage:
"""
Class that represents the usage of a tool by an agent.
Attributes:
tools_handler: Tools handler that will manage the tool usage.
tools: List of tools available for the agent.
llm: Language model to be used for the tool usage.
"""
def __init__(
self, tools_handler: ToolsHandler, tools: List[BaseTool], llm: Any
) -> None:
self._i18n: I18N = I18N()
self._printer: Printer = Printer()
self._telemetry: Telemetry = Telemetry()
self._run_attempts: int = 1
self._max_parsing_attempts: int = 3
self.tools_handler = tools_handler
self.tools = tools
self.llm = llm
def use(self, tool_string: str):
calling = self._tool_calling(tool_string)
tool = self._select_tool(calling.function_name)
return self._use(tool=tool, calling=calling)
def _use(self, tool: BaseTool, calling: ToolCalling) -> None:
if self._check_tool_repeated_usage(calling=calling):
result = self._i18n.errors("task_repeated_usage").format(
tool=calling.function_name, tool_input=calling.arguments
)
else:
self.tools_handler.on_tool_start(calling=calling)
result = self.tools_handler.cache.read(
tool=calling.function_name, input=calling.arguments
)
if not result:
result = tool._run(**calling.arguments)
self.tools_handler.on_tool_end(calling=calling, output=result)
self._printer.print(content=f"\n\n{result}\n", color="yellow")
self._telemetry.tool_usage(
llm=self.llm, tool_name=tool.name, attempts=self._run_attempts
)
return result
def _check_tool_repeated_usage(self, calling: ToolCalling) -> None:
if last_tool_usage := self.tools_handler.last_used_tool:
return calling == last_tool_usage
def _select_tool(self, tool_name: str) -> BaseTool:
for tool in self.tools:
if tool.name == tool_name:
return tool
raise Exception(f"Tool '{tool_name}' not found.")
def _render(self) -> str:
"""Render the tool name and description in plain text."""
descriptions = []
for tool in self.tools:
args = {
k: {k2: v2 for k2, v2 in v.items() if k2 in ["description", "type"]}
for k, v in tool.args.items()
}
descriptions.append(
"\n".join(
[
f"Funtion Name: {tool.name}",
f"Funtion attributes: {args}",
f"Description: {tool.description}",
]
)
)
return "\n--\n".join(descriptions)
def _tool_calling(self, tool_string: str) -> ToolCalling:
try:
parser = PydanticOutputParser(pydantic_object=ToolCalling)
prompt = PromptTemplate(
template="Return a valid schema for the one tool you must use with its arguments and values.\n\nTools available:\n\n{available_tools}\n\nUse this text to inform a valid ouput schema:\n{tool_string}\n\n{format_instructions}\n```",
input_variables=["tool_string"],
partial_variables={
"available_tools": self._render(),
"format_instructions": parser.get_format_instructions(),
},
)
chain = prompt | self.llm | parser
calling = chain.invoke({"tool_string": tool_string})
except Exception as e:
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.llm)
raise e
return self._tool_calling(tool_string)
return calling

View File

@@ -16,7 +16,7 @@
"errors": {
"force_final_answer": "Στην πραγματικότητα, χρησιμοποίησα πάρα πολλά εργαλεία, οπότε θα σταματήσω τώρα και θα σας δώσω την απόλυτη ΚΑΛΥΤΕΡΗ τελική μου απάντηση ΤΩΡΑ, χρησιμοποιώντας την αναμενόμενη μορφή: ```\nΣκέφτηκα: Χρειάζεται να χρησιμοποιήσω ένα εργαλείο; Όχι\nΤελική απάντηση: [η απάντησή σας εδώ]```",
"agent_tool_unexsiting_coworker": "\nΣφάλμα κατά την εκτέλεση του εργαλείου. Ο συνάδελφος που αναφέρεται στο Ενέργεια προς εισαγωγή δεν βρέθηκε, πρέπει να είναι μία από τις ακόλουθες επιλογές: {coworkers}.\n",
"task_repeated_usage": "Μόλις χρησιμοποίησα το {tool} εργαλείο με είσοδο {tool_input}. Άρα ξέρω ήδη το αποτέλεσμα αυτού και δεν χρειάζεται να το χρησιμοποιήσω τώρα.\n"
"task_repeated_usage": "Μόλις χρησιμοποίησα το εργαλείο {tool} με είσοδο {tool_input}. Άρα ξέρω ήδη το αποτέλεσμα αυτού και δεν χρειάζεται να το χρησιμοποιήσω ξανά τώρα.\n"
},
"tools": {
"delegate_work": "Αναθέστε μια συγκεκριμένη εργασία σε έναν από τους παρακάτω συναδέλφους: {coworkers}. Η είσοδος σε αυτό το εργαλείο θα πρέπει να είναι ο ρόλος του συναδέλφου, η εργασία που θέλετε να κάνει και ΟΛΟ το απαραίτητο πλαίσιο για την εκτέλεση της εργασίας, δεν γνωρίζουν τίποτα για την εργασία, επομένως μοιραστείτε απολύτως όλα όσα γνωρίζετε, μην αναφέρετε πράγματα, αλλά αντί να τους εξηγήσεις.",

View File

@@ -16,7 +16,7 @@
"errors": {
"force_final_answer": "Actually, I used too many tools, so I'll stop now and give you my absolute BEST Final answer NOW, using the expected format: ```\nThought: Do I need to use a tool? No\nFinal Answer: [your response here]```",
"agent_tool_unexsiting_coworker": "\nError executing tool. Co-worker mentioned on the Action Input not found, it must to be one of the following options: {coworkers}.\n",
"task_repeated_usage": "I just used the {tool} tool with input {tool_input}. So I already know the result of that and don't need to use it now.\n"
"task_repeated_usage": "I just used the {tool} tool with input {tool_input}. So I already know the result of that and don't need to use it again now.\n"
},
"tools": {
"delegate_work": "Delegate a specific task to one of the following co-workers: {coworkers}. The input to this tool should be the role of the coworker, the task you want them to do, and ALL necessary context to exectue the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",