From a058a3b15bf5a486f312dacac5f82e7157b246c4 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Mon, 4 May 2026 18:42:12 +0800 Subject: [PATCH] fix(task): use acall for output conversion in async paths --- lib/crewai/src/crewai/task.py | 57 +++- lib/crewai/src/crewai/utilities/converter.py | 271 +++++++++++++++---- lib/crewai/tests/task/test_async_task.py | 73 ++++- 3 files changed, 337 insertions(+), 64 deletions(-) diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index 8ab0695be..28136097f 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -53,7 +53,11 @@ from crewai.tasks.task_output import TaskOutput from crewai.tools.base_tool import BaseTool from crewai.utilities.config import process_config from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified -from crewai.utilities.converter import Converter, convert_to_model +from crewai.utilities.converter import ( + Converter, + async_convert_to_model, + convert_to_model, +) from crewai.utilities.file_store import ( clear_task_files, get_all_files, @@ -681,7 +685,7 @@ class Task(BaseModel): json_output = None elif not self._guardrails and not self._guardrail: raw = result - pydantic_output, json_output = self._export_output(result) + pydantic_output, json_output = await self._aexport_output(result) else: raw = result pydantic_output, json_output = None, None @@ -1123,19 +1127,44 @@ Follow these guidelines: self.agent, self.converter_cls, ) - - if isinstance(model_output, BaseModel): - pydantic_output = model_output - elif isinstance(model_output, dict): - json_output = model_output - elif isinstance(model_output, str): - try: - json_output = json.loads(model_output) - except json.JSONDecodeError: - json_output = None + pydantic_output, json_output = self._unpack_model_output(model_output) return pydantic_output, json_output + async def _aexport_output( + self, result: str | BaseModel + ) -> tuple[BaseModel | None, dict[str, Any] | None]: + """Async equivalent of ``_export_output`` — uses ``acall`` so the event loop is not blocked.""" + pydantic_output: BaseModel | None = None + json_output: dict[str, Any] | None = None + + if self.output_pydantic or self.output_json: + model_output = await async_convert_to_model( + result, + self.output_pydantic, + self.output_json, + self.agent, + self.converter_cls, + ) + pydantic_output, json_output = self._unpack_model_output(model_output) + + return pydantic_output, json_output + + @staticmethod + def _unpack_model_output( + model_output: dict[str, Any] | BaseModel | str, + ) -> tuple[BaseModel | None, dict[str, Any] | None]: + if isinstance(model_output, BaseModel): + return model_output, None + if isinstance(model_output, dict): + return None, model_output + if isinstance(model_output, str): + try: + return None, json.loads(model_output) + except json.JSONDecodeError: + return None, None + return None, None + def _get_output_format(self) -> OutputFormat: if self.output_json: return OutputFormat.JSON @@ -1364,7 +1393,7 @@ Follow these guidelines: if isinstance(guardrail_result.result, str): task_output.raw = guardrail_result.result - pydantic_output, json_output = self._export_output( + pydantic_output, json_output = await self._aexport_output( guardrail_result.result ) task_output.pydantic = pydantic_output @@ -1421,7 +1450,7 @@ Follow these guidelines: json_output = None else: raw = result - pydantic_output, json_output = self._export_output(result) + pydantic_output, json_output = await self._aexport_output(result) task_output = TaskOutput( name=self.name or self.description, diff --git a/lib/crewai/src/crewai/utilities/converter.py b/lib/crewai/src/crewai/utilities/converter.py index bcab4da18..e8a73f192 100644 --- a/lib/crewai/src/crewai/utilities/converter.py +++ b/lib/crewai/src/crewai/utilities/converter.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import json import re from typing import TYPE_CHECKING, Any, Final, TypedDict @@ -41,6 +42,45 @@ class ConverterError(Exception): class Converter(OutputConverter): """Class that converts text into either pydantic or json.""" + def _build_messages(self) -> list[dict[str, str]]: + return [ + {"role": "system", "content": self.instructions}, + {"role": "user", "content": self.text}, + ] + + def _coerce_response_to_pydantic(self, response: Any) -> BaseModel: + """Validate an LLM response into the configured Pydantic model. + + Pure post-processing — performs no I/O. Shared by ``to_pydantic`` and + ``ato_pydantic`` so the validation/partial-JSON fallback logic stays in + a single place. + """ + if isinstance(response, BaseModel): + return response + try: + return self.model.model_validate_json(response) + except ValidationError: + partial = handle_partial_json( + result=response, + model=self.model, + is_json_output=False, + agent=None, + ) + if isinstance(partial, BaseModel): + return partial + if isinstance(partial, dict): + return self.model.model_validate(partial) + if isinstance(partial, str): + try: + return self.model.model_validate_json(partial) + except Exception as parse_err: + raise ConverterError( + f"Failed to convert partial JSON result into Pydantic: {parse_err}" + ) from parse_err + raise ConverterError( + "handle_partial_json returned an unexpected type." + ) from None + def to_pydantic(self, current_attempt: int = 1) -> BaseModel: """Convert text to pydantic. @@ -56,50 +96,12 @@ class Converter(OutputConverter): try: if self.llm.supports_function_calling(): response = self.llm.call( - messages=[ - {"role": "system", "content": self.instructions}, - {"role": "user", "content": self.text}, - ], + messages=self._build_messages(), response_model=self.model, ) - if isinstance(response, BaseModel): - result = response - else: - result = self.model.model_validate_json(response) else: - response = self.llm.call( - [ - {"role": "system", "content": self.instructions}, - {"role": "user", "content": self.text}, - ] - ) - try: - # Try to directly validate the response JSON - result = self.model.model_validate_json(response) - except ValidationError: - # If direct validation fails, attempt to extract valid JSON - result = handle_partial_json( # type: ignore[assignment] - result=response, - model=self.model, - is_json_output=False, - agent=None, - ) - # Ensure result is a BaseModel instance - if not isinstance(result, BaseModel): - if isinstance(result, dict): - result = self.model.model_validate(result) - elif isinstance(result, str): - try: - result = self.model.model_validate_json(result) - except Exception as parse_err: - raise ConverterError( - f"Failed to convert partial JSON result into Pydantic: {parse_err}" - ) from parse_err - else: - raise ConverterError( - "handle_partial_json returned an unexpected type." - ) from None - return result + response = self.llm.call(self._build_messages()) + return self._coerce_response_to_pydantic(response) except ValidationError as e: if current_attempt < self.max_attempts: return self.to_pydantic(current_attempt + 1) @@ -113,6 +115,30 @@ class Converter(OutputConverter): f"Failed to convert text into a Pydantic model due to error: {e}" ) from e + async def ato_pydantic(self, current_attempt: int = 1) -> BaseModel: + """Async equivalent of ``to_pydantic`` — uses ``acall`` so the event loop is not blocked.""" + try: + if self.llm.supports_function_calling(): + response = await self.llm.acall( + messages=self._build_messages(), + response_model=self.model, + ) + else: + response = await self.llm.acall(self._build_messages()) + return self._coerce_response_to_pydantic(response) + except ValidationError as e: + if current_attempt < self.max_attempts: + return await self.ato_pydantic(current_attempt + 1) + raise ConverterError( + f"Failed to convert text into a Pydantic model due to validation error: {e}" + ) from e + except Exception as e: + if current_attempt < self.max_attempts: + return await self.ato_pydantic(current_attempt + 1) + raise ConverterError( + f"Failed to convert text into a Pydantic model due to error: {e}" + ) from e + def to_json(self, current_attempt: int = 1) -> str | ConverterError | Any: # type: ignore[override] """Convert text to json. @@ -129,19 +155,28 @@ class Converter(OutputConverter): try: if self.llm.supports_function_calling(): return self._create_instructor().to_json() - return json.dumps( - self.llm.call( - [ - {"role": "system", "content": self.instructions}, - {"role": "user", "content": self.text}, - ] - ) - ) + return json.dumps(self.llm.call(self._build_messages())) except Exception as e: if current_attempt < self.max_attempts: return self.to_json(current_attempt + 1) return ConverterError(f"Failed to convert text into JSON, error: {e}.") + async def ato_json(self, current_attempt: int = 1) -> str | ConverterError | Any: + """Async equivalent of ``to_json``. + + The function-calling path delegates to ``InternalInstructor`` (currently + sync-only); we run it via ``asyncio.to_thread`` so the event loop stays + free. + """ + try: + if self.llm.supports_function_calling(): + return await asyncio.to_thread(self._create_instructor().to_json) + return json.dumps(await self.llm.acall(self._build_messages())) + except Exception as e: + if current_attempt < self.max_attempts: + return await self.ato_json(current_attempt + 1) + return ConverterError(f"Failed to convert text into JSON, error: {e}.") + def _create_instructor(self) -> InternalInstructor[Any]: """Create an instructor.""" @@ -354,6 +389,144 @@ def convert_with_instructions( return exported_result +async def async_convert_to_model( + result: str | BaseModel, + output_pydantic: type[BaseModel] | None, + output_json: type[BaseModel] | None, + agent: Agent | BaseAgent | None = None, + converter_cls: type[Converter] | None = None, +) -> dict[str, Any] | BaseModel | str: + """Async equivalent of ``convert_to_model`` — uses native ``acall``. + + Mirrors the dispatch semantics of the sync version exactly; the only + difference is that LLM-bearing branches are awaited. + """ + model = output_pydantic or output_json + if model is None: + return result + + if isinstance(result, BaseModel): + if isinstance(result, model): + return result.model_dump() if output_json else result + result = result.model_dump_json() + + if converter_cls: + return await async_convert_with_instructions( + result=result, + model=model, + is_json_output=bool(output_json), + agent=agent, + converter_cls=converter_cls, + ) + + try: + escaped_result = json.dumps(json.loads(result, strict=False)) + return validate_model( + result=escaped_result, model=model, is_json_output=bool(output_json) + ) + except (json.JSONDecodeError, ValidationError): + return await async_handle_partial_json( + result=result, + model=model, + is_json_output=bool(output_json), + agent=agent, + converter_cls=converter_cls, + ) + except Exception as e: + if agent and getattr(agent, "verbose", True): + PRINTER.print( + content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.", + color="red", + ) + return result + + +async def async_handle_partial_json( + result: str, + model: type[BaseModel], + is_json_output: bool, + agent: Agent | BaseAgent | None, + converter_cls: type[Converter] | None = None, +) -> dict[str, Any] | BaseModel | str: + """Async equivalent of ``handle_partial_json`` — defers LLM fallback to ``acall``.""" + match = _JSON_PATTERN.search(result) + if match: + try: + parsed = json.loads(match.group(), strict=False) + except json.JSONDecodeError: + return await async_convert_with_instructions( + result=result, + model=model, + is_json_output=is_json_output, + agent=agent, + converter_cls=converter_cls, + ) + + try: + exported_result = model.model_validate(parsed) + if is_json_output: + return exported_result.model_dump() + return exported_result + except ValidationError: + raise + except Exception as e: + if agent and getattr(agent, "verbose", True): + PRINTER.print( + content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.", + color="red", + ) + + return await async_convert_with_instructions( + result=result, + model=model, + is_json_output=is_json_output, + agent=agent, + converter_cls=converter_cls, + ) + + +async def async_convert_with_instructions( + result: str, + model: type[BaseModel], + is_json_output: bool, + agent: Agent | BaseAgent | None, + converter_cls: type[Converter] | None = None, +) -> dict[str, Any] | BaseModel | str: + """Async equivalent of ``convert_with_instructions`` — calls ``ato_pydantic``/``ato_json``.""" + if agent is None: + raise TypeError("Agent must be provided if converter_cls is not specified.") + + llm = getattr(agent, "function_calling_llm", None) or agent.llm + + if llm is None: + raise ValueError("Agent must have a valid LLM instance for conversion") + + instructions = get_conversion_instructions(model=model, llm=llm) + converter = create_converter( + agent=agent, + converter_cls=converter_cls, + llm=llm, + text=result, + model=model, + instructions=instructions, + ) + exported_result = ( + await converter.ato_pydantic() + if not is_json_output + else await converter.ato_json() + ) + + if isinstance(exported_result, ConverterError): + if agent and getattr(agent, "verbose", True): + PRINTER.print( + content=f"Failed to convert result to model: {exported_result}", + color="red", + ) + return result + + return exported_result + + def get_conversion_instructions( model: type[BaseModel], llm: BaseLLM | LLM | str | Any ) -> str: diff --git a/lib/crewai/tests/task/test_async_task.py b/lib/crewai/tests/task/test_async_task.py index 70fec377d..fff65b539 100644 --- a/lib/crewai/tests/task/test_async_task.py +++ b/lib/crewai/tests/task/test_async_task.py @@ -1,12 +1,14 @@ """Tests for async task execution.""" import pytest +from pydantic import BaseModel from unittest.mock import AsyncMock, MagicMock, patch from crewai.agent import Agent from crewai.task import Task from crewai.tasks.task_output import TaskOutput from crewai.tasks.output_format import OutputFormat +from crewai.utilities.converter import Converter @pytest.fixture @@ -383,4 +385,73 @@ class TestAsyncTaskOutput: assert result.description == "Test description" assert result.expected_output == "Test expected" assert result.raw == "Test result" - assert result.agent == "Test Agent" \ No newline at end of file + assert result.agent == "Test Agent" + + +class _AsyncOnlyOutput(BaseModel): + value: str + + +class TestAsyncOutputConversion: + """Regression tests for native-async output conversion (issue #5230). + + Ensures `_aexport_output` reaches the LLM via `acall` and never via the + blocking `call` method. + """ + + @pytest.mark.asyncio + async def test_aexport_output_uses_acall_not_call(self) -> None: + mock_llm = MagicMock() + mock_llm.supports_function_calling.return_value = False + mock_llm.acall = AsyncMock(return_value='{"value": "ok"}') + mock_llm.call = MagicMock( + side_effect=AssertionError("call() must NOT be invoked from async path") + ) + + converter = Converter( + llm=mock_llm, + model=_AsyncOnlyOutput, + text="raw", + instructions="convert", + max_attempts=1, + ) + result = await converter.ato_pydantic() + + assert isinstance(result, _AsyncOnlyOutput) + assert result.value == "ok" + mock_llm.acall.assert_awaited_once() + mock_llm.call.assert_not_called() + + @pytest.mark.asyncio + async def test_ato_json_function_calling_does_not_block_event_loop(self) -> None: + """The function-calling JSON path must run via asyncio.to_thread. + + ``InternalInstructor`` is sync-only; `ato_json` should offload it so the + event loop is not blocked. + """ + mock_llm = MagicMock() + mock_llm.supports_function_calling.return_value = True + + converter = Converter( + llm=mock_llm, + model=_AsyncOnlyOutput, + text="raw", + instructions="convert", + max_attempts=1, + ) + + sentinel = '{"value": "ok"}' + with patch.object( + converter, "_create_instructor" + ) as mock_create, patch( + "crewai.utilities.converter.asyncio.to_thread", new_callable=AsyncMock + ) as mock_to_thread: + instructor = MagicMock() + instructor.to_json = MagicMock(return_value=sentinel) + mock_create.return_value = instructor + mock_to_thread.return_value = sentinel + + result = await converter.ato_json() + + assert result == sentinel + mock_to_thread.assert_awaited_once_with(instructor.to_json) \ No newline at end of file