fix(task): use acall for output conversion in async paths

This commit is contained in:
Greyson LaLonde
2026-05-04 18:42:12 +08:00
committed by GitHub
parent 184c228ae9
commit a058a3b15b
3 changed files with 337 additions and 64 deletions

View File

@@ -53,7 +53,11 @@ from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.converter import (
Converter,
async_convert_to_model,
convert_to_model,
)
from crewai.utilities.file_store import (
clear_task_files,
get_all_files,
@@ -681,7 +685,7 @@ class Task(BaseModel):
json_output = None
elif not self._guardrails and not self._guardrail:
raw = result
pydantic_output, json_output = self._export_output(result)
pydantic_output, json_output = await self._aexport_output(result)
else:
raw = result
pydantic_output, json_output = None, None
@@ -1123,19 +1127,44 @@ Follow these guidelines:
self.agent,
self.converter_cls,
)
if isinstance(model_output, BaseModel):
pydantic_output = model_output
elif isinstance(model_output, dict):
json_output = model_output
elif isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
pydantic_output, json_output = self._unpack_model_output(model_output)
return pydantic_output, json_output
async def _aexport_output(
self, result: str | BaseModel
) -> tuple[BaseModel | None, dict[str, Any] | None]:
"""Async equivalent of ``_export_output`` — uses ``acall`` so the event loop is not blocked."""
pydantic_output: BaseModel | None = None
json_output: dict[str, Any] | None = None
if self.output_pydantic or self.output_json:
model_output = await async_convert_to_model(
result,
self.output_pydantic,
self.output_json,
self.agent,
self.converter_cls,
)
pydantic_output, json_output = self._unpack_model_output(model_output)
return pydantic_output, json_output
@staticmethod
def _unpack_model_output(
model_output: dict[str, Any] | BaseModel | str,
) -> tuple[BaseModel | None, dict[str, Any] | None]:
if isinstance(model_output, BaseModel):
return model_output, None
if isinstance(model_output, dict):
return None, model_output
if isinstance(model_output, str):
try:
return None, json.loads(model_output)
except json.JSONDecodeError:
return None, None
return None, None
def _get_output_format(self) -> OutputFormat:
if self.output_json:
return OutputFormat.JSON
@@ -1364,7 +1393,7 @@ Follow these guidelines:
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(
pydantic_output, json_output = await self._aexport_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
@@ -1421,7 +1450,7 @@ Follow these guidelines:
json_output = None
else:
raw = result
pydantic_output, json_output = self._export_output(result)
pydantic_output, json_output = await self._aexport_output(result)
task_output = TaskOutput(
name=self.name or self.description,

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
import json
import re
from typing import TYPE_CHECKING, Any, Final, TypedDict
@@ -41,6 +42,45 @@ class ConverterError(Exception):
class Converter(OutputConverter):
"""Class that converts text into either pydantic or json."""
def _build_messages(self) -> list[dict[str, str]]:
return [
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
def _coerce_response_to_pydantic(self, response: Any) -> BaseModel:
"""Validate an LLM response into the configured Pydantic model.
Pure post-processing — performs no I/O. Shared by ``to_pydantic`` and
``ato_pydantic`` so the validation/partial-JSON fallback logic stays in
a single place.
"""
if isinstance(response, BaseModel):
return response
try:
return self.model.model_validate_json(response)
except ValidationError:
partial = handle_partial_json(
result=response,
model=self.model,
is_json_output=False,
agent=None,
)
if isinstance(partial, BaseModel):
return partial
if isinstance(partial, dict):
return self.model.model_validate(partial)
if isinstance(partial, str):
try:
return self.model.model_validate_json(partial)
except Exception as parse_err:
raise ConverterError(
f"Failed to convert partial JSON result into Pydantic: {parse_err}"
) from parse_err
raise ConverterError(
"handle_partial_json returned an unexpected type."
) from None
def to_pydantic(self, current_attempt: int = 1) -> BaseModel:
"""Convert text to pydantic.
@@ -56,50 +96,12 @@ class Converter(OutputConverter):
try:
if self.llm.supports_function_calling():
response = self.llm.call(
messages=[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
],
messages=self._build_messages(),
response_model=self.model,
)
if isinstance(response, BaseModel):
result = response
else:
result = self.model.model_validate_json(response)
else:
response = self.llm.call(
[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
)
try:
# Try to directly validate the response JSON
result = self.model.model_validate_json(response)
except ValidationError:
# If direct validation fails, attempt to extract valid JSON
result = handle_partial_json( # type: ignore[assignment]
result=response,
model=self.model,
is_json_output=False,
agent=None,
)
# Ensure result is a BaseModel instance
if not isinstance(result, BaseModel):
if isinstance(result, dict):
result = self.model.model_validate(result)
elif isinstance(result, str):
try:
result = self.model.model_validate_json(result)
except Exception as parse_err:
raise ConverterError(
f"Failed to convert partial JSON result into Pydantic: {parse_err}"
) from parse_err
else:
raise ConverterError(
"handle_partial_json returned an unexpected type."
) from None
return result
response = self.llm.call(self._build_messages())
return self._coerce_response_to_pydantic(response)
except ValidationError as e:
if current_attempt < self.max_attempts:
return self.to_pydantic(current_attempt + 1)
@@ -113,6 +115,30 @@ class Converter(OutputConverter):
f"Failed to convert text into a Pydantic model due to error: {e}"
) from e
async def ato_pydantic(self, current_attempt: int = 1) -> BaseModel:
"""Async equivalent of ``to_pydantic`` — uses ``acall`` so the event loop is not blocked."""
try:
if self.llm.supports_function_calling():
response = await self.llm.acall(
messages=self._build_messages(),
response_model=self.model,
)
else:
response = await self.llm.acall(self._build_messages())
return self._coerce_response_to_pydantic(response)
except ValidationError as e:
if current_attempt < self.max_attempts:
return await self.ato_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to validation error: {e}"
) from e
except Exception as e:
if current_attempt < self.max_attempts:
return await self.ato_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to error: {e}"
) from e
def to_json(self, current_attempt: int = 1) -> str | ConverterError | Any: # type: ignore[override]
"""Convert text to json.
@@ -129,19 +155,28 @@ class Converter(OutputConverter):
try:
if self.llm.supports_function_calling():
return self._create_instructor().to_json()
return json.dumps(
self.llm.call(
[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
)
)
return json.dumps(self.llm.call(self._build_messages()))
except Exception as e:
if current_attempt < self.max_attempts:
return self.to_json(current_attempt + 1)
return ConverterError(f"Failed to convert text into JSON, error: {e}.")
async def ato_json(self, current_attempt: int = 1) -> str | ConverterError | Any:
"""Async equivalent of ``to_json``.
The function-calling path delegates to ``InternalInstructor`` (currently
sync-only); we run it via ``asyncio.to_thread`` so the event loop stays
free.
"""
try:
if self.llm.supports_function_calling():
return await asyncio.to_thread(self._create_instructor().to_json)
return json.dumps(await self.llm.acall(self._build_messages()))
except Exception as e:
if current_attempt < self.max_attempts:
return await self.ato_json(current_attempt + 1)
return ConverterError(f"Failed to convert text into JSON, error: {e}.")
def _create_instructor(self) -> InternalInstructor[Any]:
"""Create an instructor."""
@@ -354,6 +389,144 @@ def convert_with_instructions(
return exported_result
async def async_convert_to_model(
result: str | BaseModel,
output_pydantic: type[BaseModel] | None,
output_json: type[BaseModel] | None,
agent: Agent | BaseAgent | None = None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async equivalent of ``convert_to_model`` — uses native ``acall``.
Mirrors the dispatch semantics of the sync version exactly; the only
difference is that LLM-bearing branches are awaited.
"""
model = output_pydantic or output_json
if model is None:
return result
if isinstance(result, BaseModel):
if isinstance(result, model):
return result.model_dump() if output_json else result
result = result.model_dump_json()
if converter_cls:
return await async_convert_with_instructions(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)
try:
escaped_result = json.dumps(json.loads(result, strict=False))
return validate_model(
result=escaped_result, model=model, is_json_output=bool(output_json)
)
except (json.JSONDecodeError, ValidationError):
return await async_handle_partial_json(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)
except Exception as e:
if agent and getattr(agent, "verbose", True):
PRINTER.print(
content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.",
color="red",
)
return result
async def async_handle_partial_json(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent | BaseAgent | None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async equivalent of ``handle_partial_json`` — defers LLM fallback to ``acall``."""
match = _JSON_PATTERN.search(result)
if match:
try:
parsed = json.loads(match.group(), strict=False)
except json.JSONDecodeError:
return await async_convert_with_instructions(
result=result,
model=model,
is_json_output=is_json_output,
agent=agent,
converter_cls=converter_cls,
)
try:
exported_result = model.model_validate(parsed)
if is_json_output:
return exported_result.model_dump()
return exported_result
except ValidationError:
raise
except Exception as e:
if agent and getattr(agent, "verbose", True):
PRINTER.print(
content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.",
color="red",
)
return await async_convert_with_instructions(
result=result,
model=model,
is_json_output=is_json_output,
agent=agent,
converter_cls=converter_cls,
)
async def async_convert_with_instructions(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent | BaseAgent | None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async equivalent of ``convert_with_instructions`` — calls ``ato_pydantic``/``ato_json``."""
if agent is None:
raise TypeError("Agent must be provided if converter_cls is not specified.")
llm = getattr(agent, "function_calling_llm", None) or agent.llm
if llm is None:
raise ValueError("Agent must have a valid LLM instance for conversion")
instructions = get_conversion_instructions(model=model, llm=llm)
converter = create_converter(
agent=agent,
converter_cls=converter_cls,
llm=llm,
text=result,
model=model,
instructions=instructions,
)
exported_result = (
await converter.ato_pydantic()
if not is_json_output
else await converter.ato_json()
)
if isinstance(exported_result, ConverterError):
if agent and getattr(agent, "verbose", True):
PRINTER.print(
content=f"Failed to convert result to model: {exported_result}",
color="red",
)
return result
return exported_result
def get_conversion_instructions(
model: type[BaseModel], llm: BaseLLM | LLM | str | Any
) -> str:

View File

@@ -1,12 +1,14 @@
"""Tests for async task execution."""
import pytest
from pydantic import BaseModel
from unittest.mock import AsyncMock, MagicMock, patch
from crewai.agent import Agent
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.tasks.output_format import OutputFormat
from crewai.utilities.converter import Converter
@pytest.fixture
@@ -383,4 +385,73 @@ class TestAsyncTaskOutput:
assert result.description == "Test description"
assert result.expected_output == "Test expected"
assert result.raw == "Test result"
assert result.agent == "Test Agent"
assert result.agent == "Test Agent"
class _AsyncOnlyOutput(BaseModel):
value: str
class TestAsyncOutputConversion:
"""Regression tests for native-async output conversion (issue #5230).
Ensures `_aexport_output` reaches the LLM via `acall` and never via the
blocking `call` method.
"""
@pytest.mark.asyncio
async def test_aexport_output_uses_acall_not_call(self) -> None:
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = False
mock_llm.acall = AsyncMock(return_value='{"value": "ok"}')
mock_llm.call = MagicMock(
side_effect=AssertionError("call() must NOT be invoked from async path")
)
converter = Converter(
llm=mock_llm,
model=_AsyncOnlyOutput,
text="raw",
instructions="convert",
max_attempts=1,
)
result = await converter.ato_pydantic()
assert isinstance(result, _AsyncOnlyOutput)
assert result.value == "ok"
mock_llm.acall.assert_awaited_once()
mock_llm.call.assert_not_called()
@pytest.mark.asyncio
async def test_ato_json_function_calling_does_not_block_event_loop(self) -> None:
"""The function-calling JSON path must run via asyncio.to_thread.
``InternalInstructor`` is sync-only; `ato_json` should offload it so the
event loop is not blocked.
"""
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = True
converter = Converter(
llm=mock_llm,
model=_AsyncOnlyOutput,
text="raw",
instructions="convert",
max_attempts=1,
)
sentinel = '{"value": "ok"}'
with patch.object(
converter, "_create_instructor"
) as mock_create, patch(
"crewai.utilities.converter.asyncio.to_thread", new_callable=AsyncMock
) as mock_to_thread:
instructor = MagicMock()
instructor.to_json = MagicMock(return_value=sentinel)
mock_create.return_value = instructor
mock_to_thread.return_value = sentinel
result = await converter.ato_json()
assert result == sentinel
mock_to_thread.assert_awaited_once_with(instructor.to_json)