diff --git a/src/crewai/task.py b/src/crewai/task.py index 8b1a05ca0..017d21083 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -1,6 +1,5 @@ import json import os -import re import threading import uuid from concurrent.futures import Future @@ -8,7 +7,6 @@ from copy import copy from hashlib import md5 from typing import Any, Dict, List, Optional, Tuple, Type, Union -from langchain_openai import ChatOpenAI from opentelemetry.trace import Span from pydantic import UUID4, BaseModel, Field, field_validator, model_validator from pydantic_core import PydanticCustomError @@ -17,10 +15,8 @@ from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.tasks.output_format import OutputFormat from crewai.tasks.task_output import TaskOutput from crewai.telemetry.telemetry import Telemetry -from crewai.utilities.converter import Converter, ConverterError +from crewai.utilities.converter import Converter, convert_to_model from crewai.utilities.i18n import I18N -from crewai.utilities.printer import Printer -from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser class Task(BaseModel): @@ -254,9 +250,7 @@ class Task(BaseModel): content = ( json_output if json_output - else pydantic_output.model_dump_json() - if pydantic_output - else result + else pydantic_output.model_dump_json() if pydantic_output else result ) self._save_file(content) @@ -326,18 +320,6 @@ class Task(BaseModel): return copied_task - def _create_converter(self, *args, **kwargs) -> Converter: - """Create a converter instance.""" - if self.agent and not self.converter_cls: - converter = self.agent.get_output_converter(*args, **kwargs) - elif self.converter_cls: - converter = self.converter_cls(*args, **kwargs) - - if not converter: - raise Exception("No output converter found or set.") - - return converter - def _export_output( self, result: str ) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]: @@ -345,75 +327,26 @@ class Task(BaseModel): json_output: Optional[Dict[str, Any]] = None if self.output_pydantic or self.output_json: - model_output = self._convert_to_model(result) - pydantic_output = ( - model_output if isinstance(model_output, BaseModel) else None + model_output = convert_to_model( + result, + self.output_pydantic, + self.output_json, + self.agent, + self.converter_cls, ) - if isinstance(model_output, str): + + if isinstance(model_output, BaseModel): + pydantic_output = model_output + elif isinstance(model_output, dict): + json_output = model_output + elif isinstance(model_output, str): try: json_output = json.loads(model_output) except json.JSONDecodeError: json_output = None - else: - json_output = model_output if isinstance(model_output, dict) else None return pydantic_output, json_output - def _convert_to_model(self, result: str) -> Union[dict, BaseModel, str]: - model = self.output_pydantic or self.output_json - if model is None: - return result - - try: - return self._validate_model(result, model) - except Exception: - return self._handle_partial_json(result, model) - - def _validate_model( - self, result: str, model: Type[BaseModel] - ) -> Union[dict, BaseModel]: - exported_result = model.model_validate_json(result) - if self.output_json: - return exported_result.model_dump() - return exported_result - - def _handle_partial_json( - self, result: str, model: Type[BaseModel] - ) -> Union[dict, BaseModel, str]: - match = re.search(r"({.*})", result, re.DOTALL) - if match: - try: - exported_result = model.model_validate_json(match.group(0)) - if self.output_json: - return exported_result.model_dump() - return exported_result - except Exception: - pass - - return self._convert_with_instructions(result, model) - - def _convert_with_instructions( - self, result: str, model: Type[BaseModel] - ) -> Union[dict, BaseModel, str]: - llm = self.agent.function_calling_llm or self.agent.llm # type: ignore # Item "None" of "BaseAgent | None" has no attribute "function_calling_llm" - instructions = self._get_conversion_instructions(model, llm) - - converter = self._create_converter( - llm=llm, text=result, model=model, instructions=instructions - ) - exported_result = ( - converter.to_pydantic() if self.output_pydantic else converter.to_json() - ) - - if isinstance(exported_result, ConverterError): - Printer().print( - content=f"{exported_result.message} Using raw output instead.", - color="red", - ) - return result - - return exported_result - def _get_output_format(self) -> OutputFormat: if self.output_json: return OutputFormat.JSON @@ -421,26 +354,6 @@ class Task(BaseModel): return OutputFormat.PYDANTIC return OutputFormat.RAW - def _get_conversion_instructions(self, model: Type[BaseModel], llm: Any) -> str: - instructions = "I'm gonna convert this raw text into valid JSON." - if not self._is_gpt(llm): - model_schema = PydanticSchemaParser(model=model).get_schema() - instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}" - return instructions - - def _save_output(self, content: str) -> None: - if not self.output_file: - raise Exception("Output file path is not set.") - - directory = os.path.dirname(self.output_file) - if directory and not os.path.exists(directory): - os.makedirs(directory) - with open(self.output_file, "w", encoding="utf-8") as file: - file.write(content) - - def _is_gpt(self, llm) -> bool: - return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None - def _save_file(self, result: Any) -> None: directory = os.path.dirname(self.output_file) # type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None" diff --git a/src/crewai/utilities/converter.py b/src/crewai/utilities/converter.py index f5732f62f..ec2bace7e 100644 --- a/src/crewai/utilities/converter.py +++ b/src/crewai/utilities/converter.py @@ -1,9 +1,14 @@ import json +import re +from typing import Any, Optional, Type, Union from langchain.schema import HumanMessage, SystemMessage from langchain_openai import ChatOpenAI +from pydantic import BaseModel, ValidationError from crewai.agents.agent_builder.utilities.base_output_converter import OutputConverter +from crewai.utilities.printer import Printer +from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser class ConverterError(Exception): @@ -72,3 +77,153 @@ class Converter(OutputConverter): def is_gpt(self) -> bool: """Return if llm provided is of gpt from openai.""" return isinstance(self.llm, ChatOpenAI) and self.llm.openai_api_base is None + + +def convert_to_model( + result: str, + output_pydantic: Optional[Type[BaseModel]], + output_json: Optional[Type[BaseModel]], + agent: Any, + converter_cls: Optional[Type[Converter]] = None, +) -> Union[dict, BaseModel, str]: + model = output_pydantic or output_json + if model is None: + return result + + try: + escaped_result = json.dumps(json.loads(result, strict=False)) + return validate_model(escaped_result, model, bool(output_json)) + except json.JSONDecodeError as e: + Printer().print( + content=f"Error parsing JSON: {e}. Attempting to handle partial JSON.", + color="yellow", + ) + return handle_partial_json( + result, model, bool(output_json), agent, converter_cls + ) + except ValidationError as e: + Printer().print( + content=f"Pydantic validation error: {e}. Attempting to handle partial JSON.", + color="yellow", + ) + return handle_partial_json( + result, model, bool(output_json), agent, converter_cls + ) + except Exception as e: + Printer().print( + content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.", + color="red", + ) + return result + + +def validate_model( + result: str, model: Type[BaseModel], is_json_output: bool +) -> Union[dict, BaseModel]: + exported_result = model.model_validate_json(result) + if is_json_output: + return exported_result.model_dump() + return exported_result + + +def handle_partial_json( + result: str, + model: Type[BaseModel], + is_json_output: bool, + agent: Any, + converter_cls: Optional[Type[Converter]] = None, +) -> Union[dict, BaseModel, str]: + match = re.search(r"({.*})", result, re.DOTALL) + if match: + try: + exported_result = model.model_validate_json(match.group(0)) + if is_json_output: + return exported_result.model_dump() + return exported_result + except json.JSONDecodeError as e: + Printer().print( + content=f"Error parsing JSON: {e}. The extracted JSON-like string is not valid JSON. Attempting alternative conversion method.", + color="yellow", + ) + except ValidationError as e: + Printer().print( + content=f"Pydantic validation error: {e}. The JSON structure doesn't match the expected model. Attempting alternative conversion method.", + color="yellow", + ) + except Exception as e: + Printer().print( + content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.", + color="red", + ) + + return convert_with_instructions( + result, model, is_json_output, agent, converter_cls + ) + + +def convert_with_instructions( + result: str, + model: Type[BaseModel], + is_json_output: bool, + agent: Any, + converter_cls: Optional[Type[Converter]] = None, +) -> Union[dict, BaseModel, str]: + llm = agent.function_calling_llm or agent.llm + instructions = get_conversion_instructions(model, llm) + + converter = create_converter( + agent=agent, + converter_cls=converter_cls, + llm=llm, + text=result, + model=model, + instructions=instructions, + ) + exported_result = ( + converter.to_pydantic() if not is_json_output else converter.to_json() + ) + + if isinstance(exported_result, ConverterError): + Printer().print( + content=f"{exported_result.message} Using raw output instead.", + color="red", + ) + return result + + return exported_result + + +def get_conversion_instructions(model: Type[BaseModel], llm: Any) -> str: + instructions = "I'm gonna convert this raw text into valid JSON." + if not is_gpt(llm): + model_schema = PydanticSchemaParser(model=model).get_schema() + instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}" + return instructions + + +def is_gpt(llm: Any) -> bool: + from langchain_openai import ChatOpenAI + + return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None + + +def create_converter( + agent: Optional[Any] = None, + converter_cls: Optional[Type[Converter]] = None, + *args, + **kwargs, +) -> Converter: + if agent and not converter_cls: + if hasattr(agent, "get_output_converter"): + converter = agent.get_output_converter(*args, **kwargs) + else: + raise AttributeError("Agent does not have a 'get_output_converter' method") + elif converter_cls: + converter = converter_cls(*args, **kwargs) + else: + raise ValueError("Either agent or converter_cls must be provided") + + if not converter: + raise Exception("No output converter found or set.") + + return converter diff --git a/tests/task_test.py b/tests/task_test.py index 6458ea358..412e42dca 100644 --- a/tests/task_test.py +++ b/tests/task_test.py @@ -5,13 +5,12 @@ import json from unittest.mock import MagicMock, patch import pytest -from pydantic import BaseModel -from pydantic_core import ValidationError - from crewai import Agent, Crew, Process, Task from crewai.tasks.conditional_task import ConditionalTask from crewai.tasks.task_output import TaskOutput from crewai.utilities.converter import Converter +from pydantic import BaseModel +from pydantic_core import ValidationError def test_task_tool_reflect_agent_tools(): diff --git a/tests/utilities/test_converter.py b/tests/utilities/test_converter.py new file mode 100644 index 000000000..c6d5bcc1c --- /dev/null +++ b/tests/utilities/test_converter.py @@ -0,0 +1,266 @@ +import json +from unittest.mock import MagicMock, Mock, patch + +import pytest +from crewai.utilities.converter import ( + Converter, + ConverterError, + convert_to_model, + convert_with_instructions, + create_converter, + get_conversion_instructions, + handle_partial_json, + is_gpt, + validate_model, +) +from pydantic import BaseModel + + +# Sample Pydantic models for testing +class EmailResponse(BaseModel): + previous_message_content: str + + +class EmailResponses(BaseModel): + responses: list[EmailResponse] + + +class SimpleModel(BaseModel): + name: str + age: int + + +class NestedModel(BaseModel): + id: int + data: SimpleModel + + +# Fixtures +@pytest.fixture +def mock_agent(): + agent = Mock() + agent.function_calling_llm = None + agent.llm = Mock() + return agent + + +# Tests for convert_to_model +def test_convert_to_model_with_valid_json(): + result = '{"name": "John", "age": 30}' + output = convert_to_model(result, SimpleModel, None, None) + assert isinstance(output, SimpleModel) + assert output.name == "John" + assert output.age == 30 + + +def test_convert_to_model_with_invalid_json(): + result = '{"name": "John", "age": "thirty"}' + with patch("crewai.utilities.converter.handle_partial_json") as mock_handle: + mock_handle.return_value = "Fallback result" + output = convert_to_model(result, SimpleModel, None, None) + assert output == "Fallback result" + + +def test_convert_to_model_with_no_model(): + result = "Plain text" + output = convert_to_model(result, None, None, None) + assert output == "Plain text" + + +def test_convert_to_model_with_special_characters(): + json_string_test = """ + { + "responses": [ + { + "previous_message_content": "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on" + } + ] + } + """ + output = convert_to_model(json_string_test, EmailResponses, None, None) + assert isinstance(output, EmailResponses) + assert len(output.responses) == 1 + assert ( + output.responses[0].previous_message_content + == "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on" + ) + + +def test_convert_to_model_with_escaped_special_characters(): + json_string_test = json.dumps( + { + "responses": [ + { + "previous_message_content": "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on" + } + ] + } + ) + output = convert_to_model(json_string_test, EmailResponses, None, None) + assert isinstance(output, EmailResponses) + assert len(output.responses) == 1 + assert ( + output.responses[0].previous_message_content + == "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on" + ) + + +def test_convert_to_model_with_multiple_special_characters(): + json_string_test = """ + { + "responses": [ + { + "previous_message_content": "Line 1\r\nLine 2\tTabbed\nLine 3\r\n\rEscaped newline" + } + ] + } + """ + output = convert_to_model(json_string_test, EmailResponses, None, None) + assert isinstance(output, EmailResponses) + assert len(output.responses) == 1 + assert ( + output.responses[0].previous_message_content + == "Line 1\r\nLine 2\tTabbed\nLine 3\r\n\rEscaped newline" + ) + + +# Tests for validate_model +def test_validate_model_pydantic_output(): + result = '{"name": "Alice", "age": 25}' + output = validate_model(result, SimpleModel, False) + assert isinstance(output, SimpleModel) + assert output.name == "Alice" + assert output.age == 25 + + +def test_validate_model_json_output(): + result = '{"name": "Bob", "age": 40}' + output = validate_model(result, SimpleModel, True) + assert isinstance(output, dict) + assert output == {"name": "Bob", "age": 40} + + +# Tests for handle_partial_json +def test_handle_partial_json_with_valid_partial(): + result = 'Some text {"name": "Charlie", "age": 35} more text' + output = handle_partial_json(result, SimpleModel, False, None) + assert isinstance(output, SimpleModel) + assert output.name == "Charlie" + assert output.age == 35 + + +def test_handle_partial_json_with_invalid_partial(mock_agent): + result = "No valid JSON here" + with patch("crewai.utilities.converter.convert_with_instructions") as mock_convert: + mock_convert.return_value = "Converted result" + output = handle_partial_json(result, SimpleModel, False, mock_agent) + assert output == "Converted result" + + +# Tests for convert_with_instructions +@patch("crewai.utilities.converter.create_converter") +@patch("crewai.utilities.converter.get_conversion_instructions") +def test_convert_with_instructions_success( + mock_get_instructions, mock_create_converter, mock_agent +): + mock_get_instructions.return_value = "Instructions" + mock_converter = Mock() + mock_converter.to_pydantic.return_value = SimpleModel(name="David", age=50) + mock_create_converter.return_value = mock_converter + + result = "Some text to convert" + output = convert_with_instructions(result, SimpleModel, False, mock_agent) + + assert isinstance(output, SimpleModel) + assert output.name == "David" + assert output.age == 50 + + +@patch("crewai.utilities.converter.create_converter") +@patch("crewai.utilities.converter.get_conversion_instructions") +def test_convert_with_instructions_failure( + mock_get_instructions, mock_create_converter, mock_agent +): + mock_get_instructions.return_value = "Instructions" + mock_converter = Mock() + mock_converter.to_pydantic.return_value = ConverterError("Conversion failed") + mock_create_converter.return_value = mock_converter + + result = "Some text to convert" + with patch("crewai.utilities.converter.Printer") as mock_printer: + output = convert_with_instructions(result, SimpleModel, False, mock_agent) + assert output == result + mock_printer.return_value.print.assert_called_once() + + +# Tests for get_conversion_instructions +def test_get_conversion_instructions_gpt(): + mock_llm = Mock() + mock_llm.openai_api_base = None + with patch("crewai.utilities.converter.is_gpt", return_value=True): + instructions = get_conversion_instructions(SimpleModel, mock_llm) + assert instructions == "I'm gonna convert this raw text into valid JSON." + + +def test_get_conversion_instructions_non_gpt(): + mock_llm = Mock() + with patch("crewai.utilities.converter.is_gpt", return_value=False): + with patch("crewai.utilities.converter.PydanticSchemaParser") as mock_parser: + mock_parser.return_value.get_schema.return_value = "Sample schema" + instructions = get_conversion_instructions(SimpleModel, mock_llm) + assert "Sample schema" in instructions + + +# Tests for is_gpt +def test_is_gpt_true(): + from langchain_openai import ChatOpenAI + + mock_llm = Mock(spec=ChatOpenAI) + mock_llm.openai_api_base = None + assert is_gpt(mock_llm) is True + + +def test_is_gpt_false(): + mock_llm = Mock() + assert is_gpt(mock_llm) is False + + +class CustomConverter(Converter): + pass + + +def test_create_converter_with_mock_agent(): + mock_agent = MagicMock() + mock_agent.get_output_converter.return_value = MagicMock(spec=Converter) + + converter = create_converter( + agent=mock_agent, + llm=Mock(), + text="Sample", + model=SimpleModel, + instructions="Convert", + ) + + assert isinstance(converter, Converter) + mock_agent.get_output_converter.assert_called_once() + + +def test_create_converter_with_custom_converter(): + converter = create_converter( + converter_cls=CustomConverter, + llm=Mock(), + text="Sample", + model=SimpleModel, + instructions="Convert", + ) + + assert isinstance(converter, CustomConverter) + + +def test_create_converter_fails_without_agent_or_converter_cls(): + with pytest.raises( + ValueError, match="Either agent or converter_cls must be provided" + ): + create_converter( + llm=Mock(), text="Sample", model=SimpleModel, instructions="Convert" + )