import json from typing import Dict, List, Optional from unittest.mock import MagicMock, Mock, patch import pytest from pydantic import BaseModel from crewai.llm import LLM from crewai.utilities.converter import ( Converter, ConverterError, convert_to_model, convert_with_instructions, create_converter, generate_model_description, get_conversion_instructions, handle_partial_json, validate_model, ) from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser # Sample Pydantic models for testing class EmailResponse(BaseModel): previous_message_content: str class EmailResponses(BaseModel): responses: list[EmailResponse] class SimpleModel(BaseModel): name: str age: int class NestedModel(BaseModel): id: int data: SimpleModel # Fixtures @pytest.fixture def mock_agent(): agent = Mock() agent.function_calling_llm = None agent.llm = Mock() return agent # Tests for convert_to_model def test_convert_to_model_with_valid_json(): result = '{"name": "John", "age": 30}' output = convert_to_model(result, SimpleModel, None, None) assert isinstance(output, SimpleModel) assert output.name == "John" assert output.age == 30 def test_convert_to_model_with_invalid_json(): result = '{"name": "John", "age": "thirty"}' with patch("crewai.utilities.converter.handle_partial_json") as mock_handle: mock_handle.return_value = "Fallback result" output = convert_to_model(result, SimpleModel, None, None) assert output == "Fallback result" def test_convert_to_model_with_no_model(): result = "Plain text" output = convert_to_model(result, None, None, None) assert output == "Plain text" def test_convert_to_model_with_special_characters(): json_string_test = """ { "responses": [ { "previous_message_content": "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on" } ] } """ output = convert_to_model(json_string_test, EmailResponses, None, None) assert isinstance(output, EmailResponses) assert len(output.responses) == 1 assert ( output.responses[0].previous_message_content == "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on" ) def test_convert_to_model_with_escaped_special_characters(): json_string_test = json.dumps( { "responses": [ { "previous_message_content": "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on" } ] } ) output = convert_to_model(json_string_test, EmailResponses, None, None) assert isinstance(output, EmailResponses) assert len(output.responses) == 1 assert ( output.responses[0].previous_message_content == "Hi Tom,\r\n\r\nNiamh has chosen the Mika phonics on" ) def test_convert_to_model_with_multiple_special_characters(): json_string_test = """ { "responses": [ { "previous_message_content": "Line 1\r\nLine 2\tTabbed\nLine 3\r\n\rEscaped newline" } ] } """ output = convert_to_model(json_string_test, EmailResponses, None, None) assert isinstance(output, EmailResponses) assert len(output.responses) == 1 assert ( output.responses[0].previous_message_content == "Line 1\r\nLine 2\tTabbed\nLine 3\r\n\rEscaped newline" ) # Tests for validate_model def test_validate_model_pydantic_output(): result = '{"name": "Alice", "age": 25}' output = validate_model(result, SimpleModel, False) assert isinstance(output, SimpleModel) assert output.name == "Alice" assert output.age == 25 def test_validate_model_json_output(): result = '{"name": "Bob", "age": 40}' output = validate_model(result, SimpleModel, True) assert isinstance(output, dict) assert output == {"name": "Bob", "age": 40} # Tests for handle_partial_json def test_handle_partial_json_with_valid_partial(): result = 'Some text {"name": "Charlie", "age": 35} more text' output = handle_partial_json(result, SimpleModel, False, None) assert isinstance(output, SimpleModel) assert output.name == "Charlie" assert output.age == 35 def test_handle_partial_json_with_invalid_partial(mock_agent): result = "No valid JSON here" with patch("crewai.utilities.converter.convert_with_instructions") as mock_convert: mock_convert.return_value = "Converted result" output = handle_partial_json(result, SimpleModel, False, mock_agent) assert output == "Converted result" # Tests for convert_with_instructions @patch("crewai.utilities.converter.create_converter") @patch("crewai.utilities.converter.get_conversion_instructions") def test_convert_with_instructions_success( mock_get_instructions, mock_create_converter, mock_agent ): mock_get_instructions.return_value = "Instructions" mock_converter = Mock() mock_converter.to_pydantic.return_value = SimpleModel(name="David", age=50) mock_create_converter.return_value = mock_converter result = "Some text to convert" output = convert_with_instructions(result, SimpleModel, False, mock_agent) assert isinstance(output, SimpleModel) assert output.name == "David" assert output.age == 50 @patch("crewai.utilities.converter.create_converter") @patch("crewai.utilities.converter.get_conversion_instructions") def test_convert_with_instructions_failure( mock_get_instructions, mock_create_converter, mock_agent ): mock_get_instructions.return_value = "Instructions" mock_converter = Mock() mock_converter.to_pydantic.return_value = ConverterError("Conversion failed") mock_create_converter.return_value = mock_converter result = "Some text to convert" with patch("crewai.utilities.converter.Printer") as mock_printer: output = convert_with_instructions(result, SimpleModel, False, mock_agent) assert output == result mock_printer.return_value.print.assert_called_once() # Tests for get_conversion_instructions def test_get_conversion_instructions_gpt(): mock_llm = Mock() mock_llm.openai_api_base = None with patch.object(LLM, "supports_function_calling") as supports_function_calling: supports_function_calling.return_value = True instructions = get_conversion_instructions(SimpleModel, mock_llm) model_schema = PydanticSchemaParser(model=SimpleModel).get_schema() assert ( instructions == f"I'm gonna convert this raw text into valid JSON.\n\nThe json should have the following structure, with the following keys:\n{model_schema}" ) def test_get_conversion_instructions_non_gpt(): mock_llm = Mock() with patch.object(LLM, "supports_function_calling") as supports_function_calling: supports_function_calling.return_value = False with patch("crewai.utilities.converter.PydanticSchemaParser") as mock_parser: mock_parser.return_value.get_schema.return_value = "Sample schema" instructions = get_conversion_instructions(SimpleModel, mock_llm) assert "Sample schema" in instructions # Tests for is_gpt def test_supports_function_calling_true(): llm = LLM(model="gpt-4o") assert llm.supports_function_calling() is True def test_supports_function_calling_false(): llm = LLM(model="non-existent-model") assert llm.supports_function_calling() is False class CustomConverter(Converter): pass def test_create_converter_with_mock_agent(): mock_agent = MagicMock() mock_agent.get_output_converter.return_value = MagicMock(spec=Converter) converter = create_converter( agent=mock_agent, llm=Mock(), text="Sample", model=SimpleModel, instructions="Convert", ) assert isinstance(converter, Converter) mock_agent.get_output_converter.assert_called_once() def test_create_converter_with_custom_converter(): converter = create_converter( converter_cls=CustomConverter, llm=Mock(), text="Sample", model=SimpleModel, instructions="Convert", ) assert isinstance(converter, CustomConverter) def test_create_converter_fails_without_agent_or_converter_cls(): with pytest.raises( ValueError, match="Either agent or converter_cls must be provided" ): create_converter( llm=Mock(), text="Sample", model=SimpleModel, instructions="Convert" ) def test_generate_model_description_simple_model(): description = generate_model_description(SimpleModel) expected_description = '{\n "name": str,\n "age": int\n}' assert description == expected_description def test_generate_model_description_nested_model(): description = generate_model_description(NestedModel) expected_description = ( '{\n "id": int,\n "data": {\n "name": str,\n "age": int\n}\n}' ) assert description == expected_description def test_generate_model_description_optional_field(): class ModelWithOptionalField(BaseModel): name: Optional[str] age: int description = generate_model_description(ModelWithOptionalField) expected_description = '{\n "name": Optional[str],\n "age": int\n}' assert description == expected_description def test_generate_model_description_list_field(): class ModelWithListField(BaseModel): items: List[int] description = generate_model_description(ModelWithListField) expected_description = '{\n "items": List[int]\n}' assert description == expected_description def test_generate_model_description_dict_field(): class ModelWithDictField(BaseModel): attributes: Dict[str, int] description = generate_model_description(ModelWithDictField) expected_description = '{\n "attributes": Dict[str, int]\n}' assert description == expected_description