mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-21 13:58:15 +00:00
adding new converter logic
This commit is contained in:
@@ -2,12 +2,14 @@ import threading
|
||||
import uuid
|
||||
from typing import Any, List, Optional, Type
|
||||
|
||||
from langchain_openai import ChatOpenAI
|
||||
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
|
||||
from pydantic_core import PydanticCustomError
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.utilities import I18N, Instructor
|
||||
from crewai.utilities import I18N, Converter, ConverterError, Printer
|
||||
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
|
||||
|
||||
|
||||
class Task(BaseModel):
|
||||
@@ -169,24 +171,43 @@ class Task(BaseModel):
|
||||
return "\n".join(tasks_slices)
|
||||
|
||||
def _export_output(self, result: str) -> Any:
|
||||
exported_result = result
|
||||
instructions = "I'm gonna convert this raw text into valid JSON."
|
||||
|
||||
if self.output_pydantic or self.output_json:
|
||||
model = self.output_pydantic or self.output_json
|
||||
instructor = Instructor(
|
||||
agent=self.agent,
|
||||
content=result,
|
||||
model=model,
|
||||
llm = self.agent.function_calling_llm or self.agent.llm
|
||||
|
||||
if not self._is_gpt(llm):
|
||||
model_schema = PydanticSchemaParser(model=model).get_schema()
|
||||
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
|
||||
|
||||
converter = Converter(
|
||||
llm=llm, text=result, model=model, instructions=instructions
|
||||
)
|
||||
|
||||
if self.output_pydantic:
|
||||
result = instructor.to_pydantic()
|
||||
exported_result = converter.to_pydantic()
|
||||
elif self.output_json:
|
||||
result = instructor.to_json()
|
||||
exported_result = converter.to_json()
|
||||
|
||||
if isinstance(exported_result, ConverterError):
|
||||
Printer().print(
|
||||
content=f"{exported_result.message} Using raw output instead.",
|
||||
color="red",
|
||||
)
|
||||
exported_result = result
|
||||
|
||||
if self.output_file:
|
||||
content = result if not self.output_pydantic else result.json()
|
||||
content = (
|
||||
exported_result if not self.output_pydantic else exported_result.json()
|
||||
)
|
||||
self._save_file(content)
|
||||
|
||||
return result
|
||||
return exported_result
|
||||
|
||||
def _is_gpt(self, llm) -> bool:
|
||||
return isinstance(llm, ChatOpenAI) and llm.openai_api_base == None
|
||||
|
||||
def _save_file(self, result: Any) -> None:
|
||||
with open(self.output_file, "w") as file:
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
from textwrap import dedent
|
||||
from typing import Any, List, Union
|
||||
|
||||
from langchain.prompts import PromptTemplate
|
||||
from langchain_core.tools import BaseTool
|
||||
from langchain_openai import ChatOpenAI
|
||||
|
||||
from crewai.agents.tools_handler import ToolsHandler
|
||||
from crewai.telemtry import Telemetry
|
||||
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
|
||||
from crewai.tools.tool_output_parser import ToolOutputParser
|
||||
from crewai.utilities import I18N, Instructor, Printer
|
||||
from crewai.utilities import I18N, Converter, ConverterError, Printer
|
||||
|
||||
OPENAI_BIGGER_MODELS = ["gpt-4"]
|
||||
|
||||
@@ -189,52 +187,33 @@ class ToolUsage:
|
||||
)
|
||||
return "\n--\n".join(descriptions)
|
||||
|
||||
def _is_gpt(self, llm) -> bool:
|
||||
return isinstance(llm, ChatOpenAI) and llm.openai_api_base == None
|
||||
|
||||
def _tool_calling(
|
||||
self, tool_string: str
|
||||
) -> Union[ToolCalling, InstructorToolCalling]:
|
||||
try:
|
||||
if (isinstance(self.llm, ChatOpenAI)) and (
|
||||
self.llm.openai_api_base == None
|
||||
):
|
||||
instructor = Instructor(
|
||||
llm=self.llm,
|
||||
model=InstructorToolCalling,
|
||||
content=f"Tools available:\n###\n{self._render()}\n\nReturn a valid schema for the tool, the tool name must be equal one of the options, use this text to inform a valid ouput schema:\n{tool_string}```",
|
||||
instructions=dedent(
|
||||
"""\
|
||||
model = InstructorToolCalling if self._is_gpt(self.llm) else ToolCalling
|
||||
converter = Converter(
|
||||
text=f"Only tools available:\n###\n{self._render()}\n\nReturn a valid schema for the tool, the tool name must be exactly equal one of the options, use this text to inform the valid ouput schema:\n\n{tool_string}```",
|
||||
llm=self.llm,
|
||||
model=model,
|
||||
instructions=dedent(
|
||||
"""\
|
||||
The schema should have the following structure, only two keys:
|
||||
- tool_name: str
|
||||
- arguments: dict (with all arguments being passed)
|
||||
|
||||
Example:
|
||||
{"tool_name": "tool name", "arguments": {"arg_name1": "value", "arg_name2": 2}}
|
||||
"""
|
||||
),
|
||||
)
|
||||
calling = instructor.to_pydantic()
|
||||
|
||||
else:
|
||||
parser = ToolOutputParser(pydantic_object=ToolCalling)
|
||||
prompt = PromptTemplate(
|
||||
template="Tools available:\n\n{available_tools}\n\nReturn a valid schema for the tool, the tool name must be equal one of the options, use this text to inform a valid ouput schema:\n{tool_string}\n\n{format_instructions}\n```",
|
||||
input_variables=["tool_string"],
|
||||
partial_variables={
|
||||
"available_tools": self._render(),
|
||||
"format_instructions": dedent(
|
||||
"""\
|
||||
The schema should have the following structure, only two keys:
|
||||
- tool_name: str
|
||||
- arguments: dict (with all arguments being passed)
|
||||
|
||||
Example:
|
||||
{"tool_name": "tool_name", "arguments": {"arg_name1": "value", "arg_name2": 2}}
|
||||
"""
|
||||
),
|
||||
},
|
||||
)
|
||||
chain = prompt | self.llm | parser
|
||||
calling = chain.invoke({"tool_string": tool_string})
|
||||
{"tool_name": "tool name", "arguments": {"arg_name1": "value", "arg_name2": 2}}""",
|
||||
),
|
||||
max_attemps=1,
|
||||
)
|
||||
calling = converter.to_pydantic()
|
||||
|
||||
if isinstance(calling, ConverterError):
|
||||
raise calling
|
||||
except Exception as e:
|
||||
self._run_attempts += 1
|
||||
if self._run_attempts > self._max_parsing_attempts:
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from .converter import Converter, ConverterError
|
||||
from .i18n import I18N
|
||||
from .instructor import Instructor
|
||||
from .logger import Logger
|
||||
|
||||
84
src/crewai/utilities/converter.py
Normal file
84
src/crewai/utilities/converter.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import json
|
||||
from typing import Any, Optional
|
||||
|
||||
from langchain.schema import HumanMessage, SystemMessage
|
||||
from langchain_openai import ChatOpenAI
|
||||
from pydantic import BaseModel, Field, PrivateAttr, model_validator
|
||||
|
||||
from crewai.utilities import Instructor
|
||||
from crewai.utilities.crew_pydantic_output_parser import CrewPydanticOutputParser
|
||||
|
||||
|
||||
class ConverterError(Exception):
|
||||
"""Error raised when Converter fails to parse the input."""
|
||||
|
||||
def __init__(self, message: str, *args: object) -> None:
|
||||
super().__init__(message, *args)
|
||||
self.message = message
|
||||
|
||||
|
||||
class Converter(BaseModel):
|
||||
"""Class that converts text into either pydantic or json."""
|
||||
|
||||
_is_gpt: bool = PrivateAttr(default=True)
|
||||
text: str = Field(description="Text to be converted.")
|
||||
llm: Any = Field(description="The language model to be used to convert the text.")
|
||||
model: Any = Field(description="The model to be used to convert the text.")
|
||||
instructions: str = Field(description="Conversion instructions to the LLM.")
|
||||
max_attemps: Optional[int] = Field(
|
||||
description="Max number of attemps to try to get the output formated.",
|
||||
default=3,
|
||||
)
|
||||
|
||||
@model_validator(mode="after")
|
||||
def check_llm_provider(self):
|
||||
if not self._is_gpt(self.llm):
|
||||
self._is_gpt = False
|
||||
|
||||
def to_pydantic(self, current_attempt=1):
|
||||
"""Convert text to pydantic."""
|
||||
try:
|
||||
if self._is_gpt:
|
||||
return self._create_instructor().to_pydantic()
|
||||
else:
|
||||
return self._create_chain().invoke({})
|
||||
except Exception as e:
|
||||
if current_attempt < self.max_attemps:
|
||||
return self.to_pydantic(current_attempt + 1)
|
||||
return ConverterError(
|
||||
f"Failed to convert text into a pydantic model due to the following error: {e}"
|
||||
)
|
||||
|
||||
def to_json(self, current_attempt=1):
|
||||
"""Convert text to json."""
|
||||
try:
|
||||
if self._is_gpt:
|
||||
return self._create_instructor().to_json()
|
||||
else:
|
||||
return json.dumps(self._create_chain().invoke({}).model_dump())
|
||||
except Exception:
|
||||
if current_attempt < self.max_attemps:
|
||||
return self.to_json(current_attempt + 1)
|
||||
return ConverterError("Failed to convert text into JSON.")
|
||||
|
||||
def _create_instructor(self):
|
||||
"""Create an instructor."""
|
||||
inst = Instructor(
|
||||
llm=self.llm,
|
||||
max_attemps=self.max_attemps,
|
||||
model=self.model,
|
||||
content=self.text,
|
||||
instructions=self.instructions,
|
||||
)
|
||||
return inst
|
||||
|
||||
def _create_chain(self):
|
||||
"""Create a chain."""
|
||||
parser = CrewPydanticOutputParser(pydantic_object=self.model)
|
||||
new_prompt = HumanMessage(content=self.text) + SystemMessage(
|
||||
content=self.instructions
|
||||
)
|
||||
return new_prompt | self.llm | parser
|
||||
|
||||
def _is_gpt(self, llm) -> bool:
|
||||
return isinstance(llm, ChatOpenAI) and llm.openai_api_base == None
|
||||
43
src/crewai/utilities/crew_pydantic_output_parser.py
Normal file
43
src/crewai/utilities/crew_pydantic_output_parser.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import json
|
||||
from typing import Any, List, Type, Union
|
||||
|
||||
import regex
|
||||
from langchain.output_parsers import PydanticOutputParser
|
||||
from langchain_core.exceptions import OutputParserException
|
||||
from langchain_core.outputs import Generation
|
||||
from langchain_core.pydantic_v1 import ValidationError
|
||||
from pydantic import BaseModel
|
||||
from pydantic.v1 import BaseModel as V1BaseModel
|
||||
|
||||
|
||||
class CrewPydanticOutputParser(PydanticOutputParser):
|
||||
"""Parses the text into pydantic models"""
|
||||
|
||||
pydantic_object: Union[Type[BaseModel], Type[V1BaseModel]]
|
||||
|
||||
def parse_result(self, result: List[Generation], *, partial: bool = False) -> Any:
|
||||
result[0].text = self._transform_in_valid_json(result[0].text)
|
||||
json_object = super().parse_result(result)
|
||||
try:
|
||||
return self.pydantic_object.parse_obj(json_object)
|
||||
except ValidationError as e:
|
||||
name = self.pydantic_object.__name__
|
||||
msg = f"Failed to parse {name} from completion {json_object}. Got: {e}"
|
||||
raise OutputParserException(msg, llm_output=json_object)
|
||||
|
||||
def _transform_in_valid_json(self, text) -> str:
|
||||
text = text.replace("```", "").replace("json", "")
|
||||
json_pattern = r"\{(?:[^{}]|(?R))*\}"
|
||||
matches = regex.finditer(json_pattern, text)
|
||||
|
||||
for match in matches:
|
||||
try:
|
||||
# Attempt to parse the matched string as JSON
|
||||
json_obj = json.loads(match.group())
|
||||
# Return the first successfully parsed JSON object
|
||||
json_obj = json.dumps(json_obj)
|
||||
return str(json_obj)
|
||||
except json.JSONDecodeError:
|
||||
# If parsing fails, skip to the next match
|
||||
continue
|
||||
return text
|
||||
@@ -47,5 +47,4 @@ class Instructor(BaseModel):
|
||||
model = self._client.chat.completions.create(
|
||||
model=self.llm.model_name, response_model=self.model, messages=messages
|
||||
)
|
||||
|
||||
return model
|
||||
|
||||
38
src/crewai/utilities/pydantic_schema_parser.py
Normal file
38
src/crewai/utilities/pydantic_schema_parser.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from typing import Type, get_args, get_origin
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class PydanticSchemaParser(BaseModel):
|
||||
model: Type[BaseModel]
|
||||
|
||||
def get_schema(self) -> str:
|
||||
"""
|
||||
Public method to get the schema of a Pydantic model.
|
||||
|
||||
:param model: The Pydantic model class to generate schema for.
|
||||
:return: String representation of the model schema.
|
||||
"""
|
||||
return self._get_model_schema(self.model)
|
||||
|
||||
def _get_model_schema(self, model, depth=0) -> str:
|
||||
lines = []
|
||||
for field_name, field in model.model_fields.items():
|
||||
field_type_str = self._get_field_type(field, depth + 1)
|
||||
lines.append(f"{' ' * 4 * depth}- {field_name}: {field_type_str}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _get_field_type(self, field, depth) -> str:
|
||||
field_type = field.annotation
|
||||
if get_origin(field_type) is list:
|
||||
list_item_type = get_args(field_type)[0]
|
||||
if issubclass(list_item_type, BaseModel):
|
||||
nested_schema = self._get_model_schema(list_item_type, depth + 1)
|
||||
return f"List[\n{nested_schema}\n{' ' * 4 * depth}]"
|
||||
else:
|
||||
return f"List[{list_item_type.__name__}]"
|
||||
elif issubclass(field_type, BaseModel):
|
||||
return f"\n{self._get_model_schema(field_type, depth)}"
|
||||
else:
|
||||
return field_type.__name__
|
||||
Reference in New Issue
Block a user