diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index 38860352b..be245e14e 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -47,7 +47,7 @@ from crewai.tasks.task_output import TaskOutput from crewai.tools.base_tool import BaseTool from crewai.utilities.config import process_config from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified -from crewai.utilities.converter import Converter, convert_to_model +from crewai.utilities.converter import Converter, aconvert_to_model, convert_to_model from crewai.utilities.file_store import ( clear_task_files, get_all_files, @@ -602,7 +602,7 @@ class Task(BaseModel): json_output = None elif not self._guardrails and not self._guardrail: raw = result - pydantic_output, json_output = self._export_output(result) + pydantic_output, json_output = await self._aexport_output(result) else: raw = result pydantic_output, json_output = None, None @@ -1040,6 +1040,34 @@ Follow these guidelines: return pydantic_output, json_output + async def _aexport_output( + self, result: str + ) -> tuple[BaseModel | None, dict[str, Any] | None]: + """Async version of _export_output that uses async LLM calls.""" + pydantic_output: BaseModel | None = None + json_output: dict[str, Any] | None = None + + if self.output_pydantic or self.output_json: + model_output = await aconvert_to_model( + result, + self.output_pydantic, + self.output_json, + self.agent, + self.converter_cls, + ) + + if isinstance(model_output, BaseModel): + pydantic_output = model_output + elif isinstance(model_output, dict): + json_output = model_output + elif isinstance(model_output, str): + try: + json_output = json.loads(model_output) + except json.JSONDecodeError: + json_output = None + + return pydantic_output, json_output + def _get_output_format(self) -> OutputFormat: if self.output_json: return OutputFormat.JSON @@ -1255,7 +1283,7 @@ Follow these guidelines: if isinstance(guardrail_result.result, str): task_output.raw = guardrail_result.result - pydantic_output, json_output = self._export_output( + pydantic_output, json_output = await self._aexport_output( guardrail_result.result ) task_output.pydantic = pydantic_output @@ -1300,7 +1328,7 @@ Follow these guidelines: tools=tools, ) - pydantic_output, json_output = self._export_output(result) + pydantic_output, json_output = await self._aexport_output(result) task_output = TaskOutput( name=self.name or self.description, description=self.description, diff --git a/lib/crewai/src/crewai/utilities/converter.py b/lib/crewai/src/crewai/utilities/converter.py index 67f542d53..562b549fd 100644 --- a/lib/crewai/src/crewai/utilities/converter.py +++ b/lib/crewai/src/crewai/utilities/converter.py @@ -142,6 +142,103 @@ class Converter(OutputConverter): return self.to_json(current_attempt + 1) return ConverterError(f"Failed to convert text into JSON, error: {e}.") + async def ato_pydantic(self, current_attempt: int = 1) -> BaseModel: + """Async version of to_pydantic. Convert text to pydantic without blocking the event loop. + + Args: + current_attempt: The current attempt number for conversion retries. + + Returns: + A Pydantic BaseModel instance. + + Raises: + ConverterError: If conversion fails after maximum attempts. + """ + try: + if self.llm.supports_function_calling(): + response = await self.llm.acall( + messages=[ + {"role": "system", "content": self.instructions}, + {"role": "user", "content": self.text}, + ], + response_model=self.model, + ) + if isinstance(response, BaseModel): + result = response + else: + result = self.model.model_validate_json(response) + else: + response = await self.llm.acall( + [ + {"role": "system", "content": self.instructions}, + {"role": "user", "content": self.text}, + ] + ) + try: + result = self.model.model_validate_json(response) + except ValidationError: + result = handle_partial_json( # type: ignore[assignment] + result=response, + model=self.model, + is_json_output=False, + agent=None, + ) + if not isinstance(result, BaseModel): + if isinstance(result, dict): + result = self.model.model_validate(result) + elif isinstance(result, str): + try: + result = self.model.model_validate_json(result) + except Exception as parse_err: + raise ConverterError( + f"Failed to convert partial JSON result into Pydantic: {parse_err}" + ) from parse_err + else: + raise ConverterError( + "handle_partial_json returned an unexpected type." + ) from None + return result + except ValidationError as e: + if current_attempt < self.max_attempts: + return await self.ato_pydantic(current_attempt + 1) + raise ConverterError( + f"Failed to convert text into a Pydantic model due to validation error: {e}" + ) from e + except Exception as e: + if current_attempt < self.max_attempts: + return await self.ato_pydantic(current_attempt + 1) + raise ConverterError( + f"Failed to convert text into a Pydantic model due to error: {e}" + ) from e + + async def ato_json(self, current_attempt: int = 1) -> str | ConverterError | Any: + """Async version of to_json. Convert text to json without blocking the event loop. + + Args: + current_attempt: The current attempt number for conversion retries. + + Returns: + A JSON string or ConverterError if conversion fails. + + Raises: + ConverterError: If conversion fails after maximum attempts. + """ + try: + if self.llm.supports_function_calling(): + return self._create_instructor().to_json() + return json.dumps( + await self.llm.acall( + [ + {"role": "system", "content": self.instructions}, + {"role": "user", "content": self.text}, + ] + ) + ) + except Exception as e: + if current_attempt < self.max_attempts: + return await self.ato_json(current_attempt + 1) + return ConverterError(f"Failed to convert text into JSON, error: {e}.") + def _create_instructor(self) -> InternalInstructor[Any]: """Create an instructor.""" @@ -338,6 +435,174 @@ def convert_with_instructions( return exported_result +async def aconvert_to_model( + result: str, + output_pydantic: type[BaseModel] | None, + output_json: type[BaseModel] | None, + agent: Agent | BaseAgent | None = None, + converter_cls: type[Converter] | None = None, +) -> dict[str, Any] | BaseModel | str: + """Async version of convert_to_model. Convert a result string to a Pydantic model or JSON. + + Uses async LLM calls to avoid blocking the event loop. + + Args: + result: The result string to convert. + output_pydantic: The Pydantic model class to convert to. + output_json: The Pydantic model class to convert to JSON. + agent: The agent instance. + converter_cls: The converter class to use. + + Returns: + The converted result as a dict, BaseModel, or original string. + """ + model = output_pydantic or output_json + if model is None: + return result + + if converter_cls: + return await aconvert_with_instructions( + result=result, + model=model, + is_json_output=bool(output_json), + agent=agent, + converter_cls=converter_cls, + ) + + try: + escaped_result = json.dumps(json.loads(result, strict=False)) + return validate_model( + result=escaped_result, model=model, is_json_output=bool(output_json) + ) + except json.JSONDecodeError: + return await ahandle_partial_json( + result=result, + model=model, + is_json_output=bool(output_json), + agent=agent, + converter_cls=converter_cls, + ) + + except ValidationError: + return await ahandle_partial_json( + result=result, + model=model, + is_json_output=bool(output_json), + agent=agent, + converter_cls=converter_cls, + ) + + except Exception as e: + if agent and getattr(agent, "verbose", True): + Printer().print( + content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.", + color="red", + ) + return result + + +async def ahandle_partial_json( + result: str, + model: type[BaseModel], + is_json_output: bool, + agent: Agent | BaseAgent | None, + converter_cls: type[Converter] | None = None, +) -> dict[str, Any] | BaseModel | str: + """Async version of handle_partial_json. + + Args: + result: The result string to process. + model: The Pydantic model class to convert to. + is_json_output: Whether to return a dict (True) or Pydantic model (False). + agent: The agent instance. + converter_cls: The converter class to use. + + Returns: + The converted result as a dict, BaseModel, or original string. + """ + match = _JSON_PATTERN.search(result) + if match: + try: + exported_result = model.model_validate_json(match.group()) + if is_json_output: + return exported_result.model_dump() + return exported_result + except json.JSONDecodeError: + pass + except ValidationError: + raise + except Exception as e: + if agent and getattr(agent, "verbose", True): + Printer().print( + content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.", + color="red", + ) + + return await aconvert_with_instructions( + result=result, + model=model, + is_json_output=is_json_output, + agent=agent, + converter_cls=converter_cls, + ) + + +async def aconvert_with_instructions( + result: str, + model: type[BaseModel], + is_json_output: bool, + agent: Agent | BaseAgent | None, + converter_cls: type[Converter] | None = None, +) -> dict[str, Any] | BaseModel | str: + """Async version of convert_with_instructions. + + Uses async LLM calls to avoid blocking the event loop. + + Args: + result: The result string to convert. + model: The Pydantic model class to convert to. + is_json_output: Whether to return a dict (True) or Pydantic model (False). + agent: The agent instance. + converter_cls: The converter class to use. + + Returns: + The converted result as a dict, BaseModel, or original string. + + Raises: + TypeError: If neither agent nor converter_cls is provided. + """ + if agent is None: + raise TypeError("Agent must be provided if converter_cls is not specified.") + + llm = getattr(agent, "function_calling_llm", None) or agent.llm + + if llm is None: + raise ValueError("Agent must have a valid LLM instance for conversion") + + instructions = get_conversion_instructions(model=model, llm=llm) + converter = create_converter( + agent=agent, + converter_cls=converter_cls, + llm=llm, + text=result, + model=model, + instructions=instructions, + ) + exported_result = ( + await converter.ato_pydantic() if not is_json_output else await converter.ato_json() + ) + + if isinstance(exported_result, ConverterError): + if agent and getattr(agent, "verbose", True): + Printer().print( + content=f"Failed to convert result to model: {exported_result}", + color="red", + ) + return result + + return exported_result + + def get_conversion_instructions( model: type[BaseModel], llm: BaseLLM | LLM | str | Any ) -> str: diff --git a/lib/crewai/tests/utilities/test_converter.py b/lib/crewai/tests/utilities/test_converter.py index 017f7f8ae..b306c7f5f 100644 --- a/lib/crewai/tests/utilities/test_converter.py +++ b/lib/crewai/tests/utilities/test_converter.py @@ -8,6 +8,9 @@ from crewai.llm import LLM from crewai.utilities.converter import ( Converter, ConverterError, + aconvert_to_model, + aconvert_with_instructions, + ahandle_partial_json, convert_to_model, convert_with_instructions, create_converter, @@ -952,3 +955,351 @@ def test_internal_instructor_real_unsupported_provider() -> None: # Verify it's a configuration error about unsupported provider assert "Unsupported provider" in str(exc_info.value) or "unsupported" in str(exc_info.value).lower() + + +# ============================================================ +# Async converter tests (issue #5230) +# ============================================================ + +@pytest.mark.asyncio +async def test_ato_pydantic_with_function_calling() -> None: + """Test that ato_pydantic uses llm.acall instead of llm.call.""" + from unittest.mock import AsyncMock + + llm = Mock(spec=LLM) + llm.supports_function_calling.return_value = True + llm.acall = AsyncMock(return_value='{"name": "Eve", "age": 35}') + + converter = Converter( + llm=llm, + text="Name: Eve, Age: 35", + model=SimpleModel, + instructions="Convert this text.", + ) + + output = await converter.ato_pydantic() + + assert isinstance(output, SimpleModel) + assert output.name == "Eve" + assert output.age == 35 + + # Verify acall was used, not call + llm.acall.assert_called_once() + assert not hasattr(llm, "call") or not llm.call.called + + +@pytest.mark.asyncio +async def test_ato_pydantic_without_function_calling() -> None: + """Test that ato_pydantic uses llm.acall for non-function-calling LLMs.""" + from unittest.mock import AsyncMock + + llm = Mock(spec=LLM) + llm.supports_function_calling.return_value = False + llm.acall = AsyncMock(return_value='{"name": "Alice", "age": 30}') + + converter = Converter( + llm=llm, + text="Name: Alice, Age: 30", + model=SimpleModel, + instructions="Convert this text.", + ) + + output = await converter.ato_pydantic() + + assert isinstance(output, SimpleModel) + assert output.name == "Alice" + assert output.age == 30 + llm.acall.assert_called_once() + + +@pytest.mark.asyncio +async def test_ato_json_without_function_calling() -> None: + """Test that ato_json uses llm.acall instead of llm.call.""" + from unittest.mock import AsyncMock + + llm = Mock(spec=LLM) + llm.supports_function_calling.return_value = False + llm.acall = AsyncMock(return_value='{"name": "Bob", "age": 40}') + + converter = Converter( + llm=llm, + text="Name: Bob, Age: 40", + model=SimpleModel, + instructions="Convert this text.", + ) + + output = await converter.ato_json() + + assert isinstance(output, str) + llm.acall.assert_called_once() + + +@pytest.mark.asyncio +async def test_ato_pydantic_retry_logic() -> None: + """Test that ato_pydantic retries on failure using acall.""" + from unittest.mock import AsyncMock + + llm = Mock(spec=LLM) + llm.supports_function_calling.return_value = False + llm.acall = AsyncMock( + side_effect=[ + "Invalid JSON", + "Still invalid", + '{"name": "Retry Alice", "age": 30}', + ] + ) + + converter = Converter( + llm=llm, + text="Name: Retry Alice, Age: 30", + model=SimpleModel, + instructions="Convert this text.", + max_attempts=3, + ) + + output = await converter.ato_pydantic() + + assert isinstance(output, SimpleModel) + assert output.name == "Retry Alice" + assert output.age == 30 + assert llm.acall.call_count == 3 + + +@pytest.mark.asyncio +async def test_ato_pydantic_error_after_max_attempts() -> None: + """Test that ato_pydantic raises ConverterError after max attempts.""" + from unittest.mock import AsyncMock + + llm = Mock(spec=LLM) + llm.supports_function_calling.return_value = False + llm.acall = AsyncMock(return_value="Invalid JSON") + + converter = Converter( + llm=llm, + text="Name: Alice, Age: 30", + model=SimpleModel, + instructions="Convert this text.", + max_attempts=3, + ) + + with pytest.raises(ConverterError) as exc_info: + await converter.ato_pydantic() + + assert "Failed to convert text into a Pydantic model" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_ato_json_retry_logic() -> None: + """Test that ato_json retries on failure using acall.""" + from unittest.mock import AsyncMock + + llm = Mock(spec=LLM) + llm.supports_function_calling.return_value = False + llm.acall = AsyncMock( + side_effect=[ + Exception("LLM error"), + Exception("LLM error again"), + '{"name": "Bob", "age": 40}', + ] + ) + + converter = Converter( + llm=llm, + text="Name: Bob, Age: 40", + model=SimpleModel, + instructions="Convert this text.", + max_attempts=3, + ) + + output = await converter.ato_json() + + assert isinstance(output, str) + assert llm.acall.call_count == 3 + + +@pytest.mark.asyncio +async def test_aconvert_to_model_with_valid_json() -> None: + """Test aconvert_to_model with valid JSON (no LLM call needed).""" + result = '{"name": "John", "age": 30}' + output = await aconvert_to_model(result, SimpleModel, None, None) + assert isinstance(output, SimpleModel) + assert output.name == "John" + assert output.age == 30 + + +@pytest.mark.asyncio +async def test_aconvert_to_model_with_no_model() -> None: + """Test aconvert_to_model returns plain text when no model specified.""" + result = "Plain text" + output = await aconvert_to_model(result, None, None, None) + assert output == "Plain text" + + +@pytest.mark.asyncio +async def test_aconvert_to_model_with_json_output() -> None: + """Test aconvert_to_model returns dict when output_json is specified.""" + result = '{"name": "John", "age": 30}' + output = await aconvert_to_model(result, None, SimpleModel, None) + assert isinstance(output, dict) + assert output == {"name": "John", "age": 30} + + +@pytest.mark.asyncio +async def test_aconvert_with_instructions_success() -> None: + """Test aconvert_with_instructions uses async converter methods.""" + from unittest.mock import AsyncMock + + mock_agent = Mock() + mock_agent.function_calling_llm = None + mock_agent.llm = Mock() + + with patch("crewai.utilities.converter.create_converter") as mock_create_converter, \ + patch("crewai.utilities.converter.get_conversion_instructions") as mock_get_instructions: + mock_get_instructions.return_value = "Instructions" + mock_converter = Mock() + mock_converter.ato_pydantic = AsyncMock( + return_value=SimpleModel(name="David", age=50) + ) + mock_create_converter.return_value = mock_converter + + result = "Some text to convert" + output = await aconvert_with_instructions(result, SimpleModel, False, mock_agent) + + assert isinstance(output, SimpleModel) + assert output.name == "David" + assert output.age == 50 + mock_converter.ato_pydantic.assert_called_once() + + +@pytest.mark.asyncio +async def test_aconvert_with_instructions_json_output() -> None: + """Test aconvert_with_instructions uses ato_json for JSON output.""" + from unittest.mock import AsyncMock + + mock_agent = Mock() + mock_agent.function_calling_llm = None + mock_agent.llm = Mock() + + with patch("crewai.utilities.converter.create_converter") as mock_create_converter, \ + patch("crewai.utilities.converter.get_conversion_instructions") as mock_get_instructions: + mock_get_instructions.return_value = "Instructions" + mock_converter = Mock() + mock_converter.ato_json = AsyncMock( + return_value='{"name": "David", "age": 50}' + ) + mock_create_converter.return_value = mock_converter + + result = "Some text to convert" + output = await aconvert_with_instructions(result, SimpleModel, True, mock_agent) + + assert output == '{"name": "David", "age": 50}' + mock_converter.ato_json.assert_called_once() + + +@pytest.mark.asyncio +async def test_aconvert_with_instructions_failure() -> None: + """Test aconvert_with_instructions returns original result on ConverterError.""" + from unittest.mock import AsyncMock + + mock_agent = Mock() + mock_agent.function_calling_llm = None + mock_agent.llm = Mock() + mock_agent.verbose = True + + with patch("crewai.utilities.converter.create_converter") as mock_create_converter, \ + patch("crewai.utilities.converter.get_conversion_instructions") as mock_get_instructions: + mock_get_instructions.return_value = "Instructions" + mock_converter = Mock() + mock_converter.ato_pydantic = AsyncMock( + return_value=ConverterError("Conversion failed") + ) + mock_create_converter.return_value = mock_converter + + result = "Some text to convert" + with patch("crewai.utilities.converter.Printer") as mock_printer: + output = await aconvert_with_instructions(result, SimpleModel, False, mock_agent) + assert output == result + mock_printer.return_value.print.assert_called_once() + + +@pytest.mark.asyncio +async def test_aconvert_with_instructions_no_agent() -> None: + """Test aconvert_with_instructions raises TypeError without agent.""" + with pytest.raises(TypeError, match="Agent must be provided"): + await aconvert_with_instructions("text", SimpleModel, False, None) + + +@pytest.mark.asyncio +async def test_ahandle_partial_json_with_valid_partial() -> None: + """Test ahandle_partial_json with valid embedded JSON.""" + result = 'Some text {"name": "Charlie", "age": 35} more text' + output = await ahandle_partial_json(result, SimpleModel, False, None) + assert isinstance(output, SimpleModel) + assert output.name == "Charlie" + assert output.age == 35 + + +@pytest.mark.asyncio +async def test_ahandle_partial_json_delegates_to_aconvert_with_instructions() -> None: + """Test ahandle_partial_json delegates to aconvert_with_instructions for invalid JSON.""" + from unittest.mock import AsyncMock + + mock_agent = Mock() + mock_agent.function_calling_llm = None + mock_agent.llm = Mock() + + result = "No valid JSON here" + with patch("crewai.utilities.converter.aconvert_with_instructions", new_callable=AsyncMock) as mock_aconvert: + mock_aconvert.return_value = "Converted result" + output = await ahandle_partial_json(result, SimpleModel, False, mock_agent) + assert output == "Converted result" + mock_aconvert.assert_called_once() + + +@pytest.mark.asyncio +async def test_async_converter_does_not_call_sync_llm_call() -> None: + """Core test for issue #5230: verify that async converter path never uses sync llm.call().""" + from unittest.mock import AsyncMock + + llm = Mock(spec=LLM) + llm.supports_function_calling.return_value = True + llm.acall = AsyncMock(return_value='{"name": "Test", "age": 25}') + llm.call = Mock(side_effect=AssertionError("sync llm.call() should not be called in async path")) + + converter = Converter( + llm=llm, + text="Name: Test, Age: 25", + model=SimpleModel, + instructions="Convert this text.", + ) + + # ato_pydantic should use acall, never call + output = await converter.ato_pydantic() + assert isinstance(output, SimpleModel) + assert output.name == "Test" + assert output.age == 25 + llm.acall.assert_called_once() + llm.call.assert_not_called() + + +@pytest.mark.asyncio +async def test_async_converter_json_does_not_call_sync_llm_call() -> None: + """Verify ato_json for non-function-calling LLMs never uses sync llm.call().""" + from unittest.mock import AsyncMock + + llm = Mock(spec=LLM) + llm.supports_function_calling.return_value = False + llm.acall = AsyncMock(return_value='{"name": "Test", "age": 25}') + llm.call = Mock(side_effect=AssertionError("sync llm.call() should not be called in async path")) + + converter = Converter( + llm=llm, + text="Name: Test, Age: 25", + model=SimpleModel, + instructions="Convert this text.", + ) + + output = await converter.ato_json() + assert isinstance(output, str) + llm.acall.assert_called_once() + llm.call.assert_not_called()