adding new converter logic

This commit is contained in:
João Moura
2024-02-22 15:16:17 -03:00
parent e397a49c23
commit 1c7f9826b4
15 changed files with 6110 additions and 24065 deletions

View File

@@ -2,12 +2,14 @@ import threading
import uuid
from typing import Any, List, Optional, Type
from langchain_openai import ChatOpenAI
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.tasks.task_output import TaskOutput
from crewai.utilities import I18N, Instructor
from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
class Task(BaseModel):
@@ -169,24 +171,43 @@ class Task(BaseModel):
return "\n".join(tasks_slices)
def _export_output(self, result: str) -> Any:
exported_result = result
instructions = "I'm gonna convert this raw text into valid JSON."
if self.output_pydantic or self.output_json:
model = self.output_pydantic or self.output_json
instructor = Instructor(
agent=self.agent,
content=result,
model=model,
llm = self.agent.function_calling_llm or self.agent.llm
if not self._is_gpt(llm):
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
converter = Converter(
llm=llm, text=result, model=model, instructions=instructions
)
if self.output_pydantic:
result = instructor.to_pydantic()
exported_result = converter.to_pydantic()
elif self.output_json:
result = instructor.to_json()
exported_result = converter.to_json()
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
exported_result = result
if self.output_file:
content = result if not self.output_pydantic else result.json()
content = (
exported_result if not self.output_pydantic else exported_result.json()
)
self._save_file(content)
return result
return exported_result
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base == None
def _save_file(self, result: Any) -> None:
with open(self.output_file, "w") as file:

View File

@@ -1,15 +1,13 @@
from textwrap import dedent
from typing import Any, List, Union
from langchain.prompts import PromptTemplate
from langchain_core.tools import BaseTool
from langchain_openai import ChatOpenAI
from crewai.agents.tools_handler import ToolsHandler
from crewai.telemtry import Telemetry
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.tools.tool_output_parser import ToolOutputParser
from crewai.utilities import I18N, Instructor, Printer
from crewai.utilities import I18N, Converter, ConverterError, Printer
OPENAI_BIGGER_MODELS = ["gpt-4"]
@@ -189,52 +187,33 @@ class ToolUsage:
)
return "\n--\n".join(descriptions)
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base == None
def _tool_calling(
self, tool_string: str
) -> Union[ToolCalling, InstructorToolCalling]:
try:
if (isinstance(self.llm, ChatOpenAI)) and (
self.llm.openai_api_base == None
):
instructor = Instructor(
llm=self.llm,
model=InstructorToolCalling,
content=f"Tools available:\n###\n{self._render()}\n\nReturn a valid schema for the tool, the tool name must be equal one of the options, use this text to inform a valid ouput schema:\n{tool_string}```",
instructions=dedent(
"""\
model = InstructorToolCalling if self._is_gpt(self.llm) else ToolCalling
converter = Converter(
text=f"Only tools available:\n###\n{self._render()}\n\nReturn a valid schema for the tool, the tool name must be exactly equal one of the options, use this text to inform the valid ouput schema:\n\n{tool_string}```",
llm=self.llm,
model=model,
instructions=dedent(
"""\
The schema should have the following structure, only two keys:
- tool_name: str
- arguments: dict (with all arguments being passed)
Example:
{"tool_name": "tool name", "arguments": {"arg_name1": "value", "arg_name2": 2}}
"""
),
)
calling = instructor.to_pydantic()
else:
parser = ToolOutputParser(pydantic_object=ToolCalling)
prompt = PromptTemplate(
template="Tools available:\n\n{available_tools}\n\nReturn a valid schema for the tool, the tool name must be equal one of the options, use this text to inform a valid ouput schema:\n{tool_string}\n\n{format_instructions}\n```",
input_variables=["tool_string"],
partial_variables={
"available_tools": self._render(),
"format_instructions": dedent(
"""\
The schema should have the following structure, only two keys:
- tool_name: str
- arguments: dict (with all arguments being passed)
Example:
{"tool_name": "tool_name", "arguments": {"arg_name1": "value", "arg_name2": 2}}
"""
),
},
)
chain = prompt | self.llm | parser
calling = chain.invoke({"tool_string": tool_string})
{"tool_name": "tool name", "arguments": {"arg_name1": "value", "arg_name2": 2}}""",
),
max_attemps=1,
)
calling = converter.to_pydantic()
if isinstance(calling, ConverterError):
raise calling
except Exception as e:
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:

View File

@@ -1,3 +1,4 @@
from .converter import Converter, ConverterError
from .i18n import I18N
from .instructor import Instructor
from .logger import Logger

View File

@@ -0,0 +1,84 @@
import json
from typing import Any, Optional
from langchain.schema import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field, PrivateAttr, model_validator
from crewai.utilities import Instructor
from crewai.utilities.crew_pydantic_output_parser import CrewPydanticOutputParser
class ConverterError(Exception):
"""Error raised when Converter fails to parse the input."""
def __init__(self, message: str, *args: object) -> None:
super().__init__(message, *args)
self.message = message
class Converter(BaseModel):
"""Class that converts text into either pydantic or json."""
_is_gpt: bool = PrivateAttr(default=True)
text: str = Field(description="Text to be converted.")
llm: Any = Field(description="The language model to be used to convert the text.")
model: Any = Field(description="The model to be used to convert the text.")
instructions: str = Field(description="Conversion instructions to the LLM.")
max_attemps: Optional[int] = Field(
description="Max number of attemps to try to get the output formated.",
default=3,
)
@model_validator(mode="after")
def check_llm_provider(self):
if not self._is_gpt(self.llm):
self._is_gpt = False
def to_pydantic(self, current_attempt=1):
"""Convert text to pydantic."""
try:
if self._is_gpt:
return self._create_instructor().to_pydantic()
else:
return self._create_chain().invoke({})
except Exception as e:
if current_attempt < self.max_attemps:
return self.to_pydantic(current_attempt + 1)
return ConverterError(
f"Failed to convert text into a pydantic model due to the following error: {e}"
)
def to_json(self, current_attempt=1):
"""Convert text to json."""
try:
if self._is_gpt:
return self._create_instructor().to_json()
else:
return json.dumps(self._create_chain().invoke({}).model_dump())
except Exception:
if current_attempt < self.max_attemps:
return self.to_json(current_attempt + 1)
return ConverterError("Failed to convert text into JSON.")
def _create_instructor(self):
"""Create an instructor."""
inst = Instructor(
llm=self.llm,
max_attemps=self.max_attemps,
model=self.model,
content=self.text,
instructions=self.instructions,
)
return inst
def _create_chain(self):
"""Create a chain."""
parser = CrewPydanticOutputParser(pydantic_object=self.model)
new_prompt = HumanMessage(content=self.text) + SystemMessage(
content=self.instructions
)
return new_prompt | self.llm | parser
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base == None

View File

@@ -0,0 +1,43 @@
import json
from typing import Any, List, Type, Union
import regex
from langchain.output_parsers import PydanticOutputParser
from langchain_core.exceptions import OutputParserException
from langchain_core.outputs import Generation
from langchain_core.pydantic_v1 import ValidationError
from pydantic import BaseModel
from pydantic.v1 import BaseModel as V1BaseModel
class CrewPydanticOutputParser(PydanticOutputParser):
"""Parses the text into pydantic models"""
pydantic_object: Union[Type[BaseModel], Type[V1BaseModel]]
def parse_result(self, result: List[Generation], *, partial: bool = False) -> Any:
result[0].text = self._transform_in_valid_json(result[0].text)
json_object = super().parse_result(result)
try:
return self.pydantic_object.parse_obj(json_object)
except ValidationError as e:
name = self.pydantic_object.__name__
msg = f"Failed to parse {name} from completion {json_object}. Got: {e}"
raise OutputParserException(msg, llm_output=json_object)
def _transform_in_valid_json(self, text) -> str:
text = text.replace("```", "").replace("json", "")
json_pattern = r"\{(?:[^{}]|(?R))*\}"
matches = regex.finditer(json_pattern, text)
for match in matches:
try:
# Attempt to parse the matched string as JSON
json_obj = json.loads(match.group())
# Return the first successfully parsed JSON object
json_obj = json.dumps(json_obj)
return str(json_obj)
except json.JSONDecodeError:
# If parsing fails, skip to the next match
continue
return text

View File

@@ -47,5 +47,4 @@ class Instructor(BaseModel):
model = self._client.chat.completions.create(
model=self.llm.model_name, response_model=self.model, messages=messages
)
return model

View File

@@ -0,0 +1,38 @@
from typing import Type, get_args, get_origin
from pydantic import BaseModel
class PydanticSchemaParser(BaseModel):
model: Type[BaseModel]
def get_schema(self) -> str:
"""
Public method to get the schema of a Pydantic model.
:param model: The Pydantic model class to generate schema for.
:return: String representation of the model schema.
"""
return self._get_model_schema(self.model)
def _get_model_schema(self, model, depth=0) -> str:
lines = []
for field_name, field in model.model_fields.items():
field_type_str = self._get_field_type(field, depth + 1)
lines.append(f"{' ' * 4 * depth}- {field_name}: {field_type_str}")
return "\n".join(lines)
def _get_field_type(self, field, depth) -> str:
field_type = field.annotation
if get_origin(field_type) is list:
list_item_type = get_args(field_type)[0]
if issubclass(list_item_type, BaseModel):
nested_schema = self._get_model_schema(list_item_type, depth + 1)
return f"List[\n{nested_schema}\n{' ' * 4 * depth}]"
else:
return f"List[{list_item_type.__name__}]"
elif issubclass(field_type, BaseModel):
return f"\n{self._get_model_schema(field_type, depth)}"
else:
return field_type.__name__