Compare commits

...

3 Commits

Author SHA1 Message Date
Devin AI
415b894405 fix: wrap sync instructor.to_json() in asyncio.to_thread for ato_json
Addresses Cursor Bugbot feedback: the function-calling path in ato_json
was still calling _create_instructor().to_json() synchronously, blocking
the event loop. Now wrapped in asyncio.to_thread() to offload the
blocking call to a separate thread.

Co-Authored-By: João <joao@crewai.com>
2026-04-02 12:54:20 +00:00
Devin AI
85369cf23e style: fix ruff formatting in converter.py
Co-Authored-By: João <joao@crewai.com>
2026-04-02 12:44:44 +00:00
Devin AI
128552a5fa fix: use async LLM calls in converter for async workflows (issue #5230)
When using akickoff() (async workflow), the _export_output method and
_ainvoke_guardrail_function were calling synchronous llm.call() through
the converter module, which blocks the event loop.

Changes:
- Add ato_pydantic() and ato_json() async methods to Converter class
  that use llm.acall() instead of llm.call()
- Add async utility functions: aconvert_to_model(), ahandle_partial_json(),
  aconvert_with_instructions()
- Add _aexport_output() async method to Task class
- Wire _aexport_output into _aexecute_core and _ainvoke_guardrail_function
  replacing the sync _export_output calls in async paths
- Add comprehensive async tests covering all new async converter paths

Fixes #5230

Co-Authored-By: João <joao@crewai.com>
2026-04-02 12:39:01 +00:00
3 changed files with 651 additions and 4 deletions

View File

@@ -47,7 +47,7 @@ from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.converter import Converter, aconvert_to_model, convert_to_model
from crewai.utilities.file_store import (
clear_task_files,
get_all_files,
@@ -602,7 +602,7 @@ class Task(BaseModel):
json_output = None
elif not self._guardrails and not self._guardrail:
raw = result
pydantic_output, json_output = self._export_output(result)
pydantic_output, json_output = await self._aexport_output(result)
else:
raw = result
pydantic_output, json_output = None, None
@@ -1040,6 +1040,34 @@ Follow these guidelines:
return pydantic_output, json_output
async def _aexport_output(
self, result: str
) -> tuple[BaseModel | None, dict[str, Any] | None]:
"""Async version of _export_output that uses async LLM calls."""
pydantic_output: BaseModel | None = None
json_output: dict[str, Any] | None = None
if self.output_pydantic or self.output_json:
model_output = await aconvert_to_model(
result,
self.output_pydantic,
self.output_json,
self.agent,
self.converter_cls,
)
if isinstance(model_output, BaseModel):
pydantic_output = model_output
elif isinstance(model_output, dict):
json_output = model_output
elif isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
return pydantic_output, json_output
def _get_output_format(self) -> OutputFormat:
if self.output_json:
return OutputFormat.JSON
@@ -1255,7 +1283,7 @@ Follow these guidelines:
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(
pydantic_output, json_output = await self._aexport_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
@@ -1300,7 +1328,7 @@ Follow these guidelines:
tools=tools,
)
pydantic_output, json_output = self._export_output(result)
pydantic_output, json_output = await self._aexport_output(result)
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
import json
import re
from typing import TYPE_CHECKING, Any, Final, TypedDict
@@ -142,6 +143,103 @@ class Converter(OutputConverter):
return self.to_json(current_attempt + 1)
return ConverterError(f"Failed to convert text into JSON, error: {e}.")
async def ato_pydantic(self, current_attempt: int = 1) -> BaseModel:
"""Async version of to_pydantic. Convert text to pydantic without blocking the event loop.
Args:
current_attempt: The current attempt number for conversion retries.
Returns:
A Pydantic BaseModel instance.
Raises:
ConverterError: If conversion fails after maximum attempts.
"""
try:
if self.llm.supports_function_calling():
response = await self.llm.acall(
messages=[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
],
response_model=self.model,
)
if isinstance(response, BaseModel):
result = response
else:
result = self.model.model_validate_json(response)
else:
response = await self.llm.acall(
[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
)
try:
result = self.model.model_validate_json(response)
except ValidationError:
result = handle_partial_json( # type: ignore[assignment]
result=response,
model=self.model,
is_json_output=False,
agent=None,
)
if not isinstance(result, BaseModel):
if isinstance(result, dict):
result = self.model.model_validate(result)
elif isinstance(result, str):
try:
result = self.model.model_validate_json(result)
except Exception as parse_err:
raise ConverterError(
f"Failed to convert partial JSON result into Pydantic: {parse_err}"
) from parse_err
else:
raise ConverterError(
"handle_partial_json returned an unexpected type."
) from None
return result
except ValidationError as e:
if current_attempt < self.max_attempts:
return await self.ato_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to validation error: {e}"
) from e
except Exception as e:
if current_attempt < self.max_attempts:
return await self.ato_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to error: {e}"
) from e
async def ato_json(self, current_attempt: int = 1) -> str | ConverterError | Any:
"""Async version of to_json. Convert text to json without blocking the event loop.
Args:
current_attempt: The current attempt number for conversion retries.
Returns:
A JSON string or ConverterError if conversion fails.
Raises:
ConverterError: If conversion fails after maximum attempts.
"""
try:
if self.llm.supports_function_calling():
return await asyncio.to_thread(self._create_instructor().to_json)
return json.dumps(
await self.llm.acall(
[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
)
)
except Exception as e:
if current_attempt < self.max_attempts:
return await self.ato_json(current_attempt + 1)
return ConverterError(f"Failed to convert text into JSON, error: {e}.")
def _create_instructor(self) -> InternalInstructor[Any]:
"""Create an instructor."""
@@ -338,6 +436,176 @@ def convert_with_instructions(
return exported_result
async def aconvert_to_model(
result: str,
output_pydantic: type[BaseModel] | None,
output_json: type[BaseModel] | None,
agent: Agent | BaseAgent | None = None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async version of convert_to_model. Convert a result string to a Pydantic model or JSON.
Uses async LLM calls to avoid blocking the event loop.
Args:
result: The result string to convert.
output_pydantic: The Pydantic model class to convert to.
output_json: The Pydantic model class to convert to JSON.
agent: The agent instance.
converter_cls: The converter class to use.
Returns:
The converted result as a dict, BaseModel, or original string.
"""
model = output_pydantic or output_json
if model is None:
return result
if converter_cls:
return await aconvert_with_instructions(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)
try:
escaped_result = json.dumps(json.loads(result, strict=False))
return validate_model(
result=escaped_result, model=model, is_json_output=bool(output_json)
)
except json.JSONDecodeError:
return await ahandle_partial_json(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)
except ValidationError:
return await ahandle_partial_json(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)
except Exception as e:
if agent and getattr(agent, "verbose", True):
Printer().print(
content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.",
color="red",
)
return result
async def ahandle_partial_json(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent | BaseAgent | None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async version of handle_partial_json.
Args:
result: The result string to process.
model: The Pydantic model class to convert to.
is_json_output: Whether to return a dict (True) or Pydantic model (False).
agent: The agent instance.
converter_cls: The converter class to use.
Returns:
The converted result as a dict, BaseModel, or original string.
"""
match = _JSON_PATTERN.search(result)
if match:
try:
exported_result = model.model_validate_json(match.group())
if is_json_output:
return exported_result.model_dump()
return exported_result
except json.JSONDecodeError:
pass
except ValidationError:
raise
except Exception as e:
if agent and getattr(agent, "verbose", True):
Printer().print(
content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.",
color="red",
)
return await aconvert_with_instructions(
result=result,
model=model,
is_json_output=is_json_output,
agent=agent,
converter_cls=converter_cls,
)
async def aconvert_with_instructions(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent | BaseAgent | None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async version of convert_with_instructions.
Uses async LLM calls to avoid blocking the event loop.
Args:
result: The result string to convert.
model: The Pydantic model class to convert to.
is_json_output: Whether to return a dict (True) or Pydantic model (False).
agent: The agent instance.
converter_cls: The converter class to use.
Returns:
The converted result as a dict, BaseModel, or original string.
Raises:
TypeError: If neither agent nor converter_cls is provided.
"""
if agent is None:
raise TypeError("Agent must be provided if converter_cls is not specified.")
llm = getattr(agent, "function_calling_llm", None) or agent.llm
if llm is None:
raise ValueError("Agent must have a valid LLM instance for conversion")
instructions = get_conversion_instructions(model=model, llm=llm)
converter = create_converter(
agent=agent,
converter_cls=converter_cls,
llm=llm,
text=result,
model=model,
instructions=instructions,
)
exported_result = (
await converter.ato_pydantic()
if not is_json_output
else await converter.ato_json()
)
if isinstance(exported_result, ConverterError):
if agent and getattr(agent, "verbose", True):
Printer().print(
content=f"Failed to convert result to model: {exported_result}",
color="red",
)
return result
return exported_result
def get_conversion_instructions(
model: type[BaseModel], llm: BaseLLM | LLM | str | Any
) -> str:

View File

@@ -8,6 +8,9 @@ from crewai.llm import LLM
from crewai.utilities.converter import (
Converter,
ConverterError,
aconvert_to_model,
aconvert_with_instructions,
ahandle_partial_json,
convert_to_model,
convert_with_instructions,
create_converter,
@@ -952,3 +955,351 @@ def test_internal_instructor_real_unsupported_provider() -> None:
# Verify it's a configuration error about unsupported provider
assert "Unsupported provider" in str(exc_info.value) or "unsupported" in str(exc_info.value).lower()
# ============================================================
# Async converter tests (issue #5230)
# ============================================================
@pytest.mark.asyncio
async def test_ato_pydantic_with_function_calling() -> None:
"""Test that ato_pydantic uses llm.acall instead of llm.call."""
from unittest.mock import AsyncMock
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = True
llm.acall = AsyncMock(return_value='{"name": "Eve", "age": 35}')
converter = Converter(
llm=llm,
text="Name: Eve, Age: 35",
model=SimpleModel,
instructions="Convert this text.",
)
output = await converter.ato_pydantic()
assert isinstance(output, SimpleModel)
assert output.name == "Eve"
assert output.age == 35
# Verify acall was used, not call
llm.acall.assert_called_once()
assert not hasattr(llm, "call") or not llm.call.called
@pytest.mark.asyncio
async def test_ato_pydantic_without_function_calling() -> None:
"""Test that ato_pydantic uses llm.acall for non-function-calling LLMs."""
from unittest.mock import AsyncMock
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.acall = AsyncMock(return_value='{"name": "Alice", "age": 30}')
converter = Converter(
llm=llm,
text="Name: Alice, Age: 30",
model=SimpleModel,
instructions="Convert this text.",
)
output = await converter.ato_pydantic()
assert isinstance(output, SimpleModel)
assert output.name == "Alice"
assert output.age == 30
llm.acall.assert_called_once()
@pytest.mark.asyncio
async def test_ato_json_without_function_calling() -> None:
"""Test that ato_json uses llm.acall instead of llm.call."""
from unittest.mock import AsyncMock
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.acall = AsyncMock(return_value='{"name": "Bob", "age": 40}')
converter = Converter(
llm=llm,
text="Name: Bob, Age: 40",
model=SimpleModel,
instructions="Convert this text.",
)
output = await converter.ato_json()
assert isinstance(output, str)
llm.acall.assert_called_once()
@pytest.mark.asyncio
async def test_ato_pydantic_retry_logic() -> None:
"""Test that ato_pydantic retries on failure using acall."""
from unittest.mock import AsyncMock
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.acall = AsyncMock(
side_effect=[
"Invalid JSON",
"Still invalid",
'{"name": "Retry Alice", "age": 30}',
]
)
converter = Converter(
llm=llm,
text="Name: Retry Alice, Age: 30",
model=SimpleModel,
instructions="Convert this text.",
max_attempts=3,
)
output = await converter.ato_pydantic()
assert isinstance(output, SimpleModel)
assert output.name == "Retry Alice"
assert output.age == 30
assert llm.acall.call_count == 3
@pytest.mark.asyncio
async def test_ato_pydantic_error_after_max_attempts() -> None:
"""Test that ato_pydantic raises ConverterError after max attempts."""
from unittest.mock import AsyncMock
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.acall = AsyncMock(return_value="Invalid JSON")
converter = Converter(
llm=llm,
text="Name: Alice, Age: 30",
model=SimpleModel,
instructions="Convert this text.",
max_attempts=3,
)
with pytest.raises(ConverterError) as exc_info:
await converter.ato_pydantic()
assert "Failed to convert text into a Pydantic model" in str(exc_info.value)
@pytest.mark.asyncio
async def test_ato_json_retry_logic() -> None:
"""Test that ato_json retries on failure using acall."""
from unittest.mock import AsyncMock
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.acall = AsyncMock(
side_effect=[
Exception("LLM error"),
Exception("LLM error again"),
'{"name": "Bob", "age": 40}',
]
)
converter = Converter(
llm=llm,
text="Name: Bob, Age: 40",
model=SimpleModel,
instructions="Convert this text.",
max_attempts=3,
)
output = await converter.ato_json()
assert isinstance(output, str)
assert llm.acall.call_count == 3
@pytest.mark.asyncio
async def test_aconvert_to_model_with_valid_json() -> None:
"""Test aconvert_to_model with valid JSON (no LLM call needed)."""
result = '{"name": "John", "age": 30}'
output = await aconvert_to_model(result, SimpleModel, None, None)
assert isinstance(output, SimpleModel)
assert output.name == "John"
assert output.age == 30
@pytest.mark.asyncio
async def test_aconvert_to_model_with_no_model() -> None:
"""Test aconvert_to_model returns plain text when no model specified."""
result = "Plain text"
output = await aconvert_to_model(result, None, None, None)
assert output == "Plain text"
@pytest.mark.asyncio
async def test_aconvert_to_model_with_json_output() -> None:
"""Test aconvert_to_model returns dict when output_json is specified."""
result = '{"name": "John", "age": 30}'
output = await aconvert_to_model(result, None, SimpleModel, None)
assert isinstance(output, dict)
assert output == {"name": "John", "age": 30}
@pytest.mark.asyncio
async def test_aconvert_with_instructions_success() -> None:
"""Test aconvert_with_instructions uses async converter methods."""
from unittest.mock import AsyncMock
mock_agent = Mock()
mock_agent.function_calling_llm = None
mock_agent.llm = Mock()
with patch("crewai.utilities.converter.create_converter") as mock_create_converter, \
patch("crewai.utilities.converter.get_conversion_instructions") as mock_get_instructions:
mock_get_instructions.return_value = "Instructions"
mock_converter = Mock()
mock_converter.ato_pydantic = AsyncMock(
return_value=SimpleModel(name="David", age=50)
)
mock_create_converter.return_value = mock_converter
result = "Some text to convert"
output = await aconvert_with_instructions(result, SimpleModel, False, mock_agent)
assert isinstance(output, SimpleModel)
assert output.name == "David"
assert output.age == 50
mock_converter.ato_pydantic.assert_called_once()
@pytest.mark.asyncio
async def test_aconvert_with_instructions_json_output() -> None:
"""Test aconvert_with_instructions uses ato_json for JSON output."""
from unittest.mock import AsyncMock
mock_agent = Mock()
mock_agent.function_calling_llm = None
mock_agent.llm = Mock()
with patch("crewai.utilities.converter.create_converter") as mock_create_converter, \
patch("crewai.utilities.converter.get_conversion_instructions") as mock_get_instructions:
mock_get_instructions.return_value = "Instructions"
mock_converter = Mock()
mock_converter.ato_json = AsyncMock(
return_value='{"name": "David", "age": 50}'
)
mock_create_converter.return_value = mock_converter
result = "Some text to convert"
output = await aconvert_with_instructions(result, SimpleModel, True, mock_agent)
assert output == '{"name": "David", "age": 50}'
mock_converter.ato_json.assert_called_once()
@pytest.mark.asyncio
async def test_aconvert_with_instructions_failure() -> None:
"""Test aconvert_with_instructions returns original result on ConverterError."""
from unittest.mock import AsyncMock
mock_agent = Mock()
mock_agent.function_calling_llm = None
mock_agent.llm = Mock()
mock_agent.verbose = True
with patch("crewai.utilities.converter.create_converter") as mock_create_converter, \
patch("crewai.utilities.converter.get_conversion_instructions") as mock_get_instructions:
mock_get_instructions.return_value = "Instructions"
mock_converter = Mock()
mock_converter.ato_pydantic = AsyncMock(
return_value=ConverterError("Conversion failed")
)
mock_create_converter.return_value = mock_converter
result = "Some text to convert"
with patch("crewai.utilities.converter.Printer") as mock_printer:
output = await aconvert_with_instructions(result, SimpleModel, False, mock_agent)
assert output == result
mock_printer.return_value.print.assert_called_once()
@pytest.mark.asyncio
async def test_aconvert_with_instructions_no_agent() -> None:
"""Test aconvert_with_instructions raises TypeError without agent."""
with pytest.raises(TypeError, match="Agent must be provided"):
await aconvert_with_instructions("text", SimpleModel, False, None)
@pytest.mark.asyncio
async def test_ahandle_partial_json_with_valid_partial() -> None:
"""Test ahandle_partial_json with valid embedded JSON."""
result = 'Some text {"name": "Charlie", "age": 35} more text'
output = await ahandle_partial_json(result, SimpleModel, False, None)
assert isinstance(output, SimpleModel)
assert output.name == "Charlie"
assert output.age == 35
@pytest.mark.asyncio
async def test_ahandle_partial_json_delegates_to_aconvert_with_instructions() -> None:
"""Test ahandle_partial_json delegates to aconvert_with_instructions for invalid JSON."""
from unittest.mock import AsyncMock
mock_agent = Mock()
mock_agent.function_calling_llm = None
mock_agent.llm = Mock()
result = "No valid JSON here"
with patch("crewai.utilities.converter.aconvert_with_instructions", new_callable=AsyncMock) as mock_aconvert:
mock_aconvert.return_value = "Converted result"
output = await ahandle_partial_json(result, SimpleModel, False, mock_agent)
assert output == "Converted result"
mock_aconvert.assert_called_once()
@pytest.mark.asyncio
async def test_async_converter_does_not_call_sync_llm_call() -> None:
"""Core test for issue #5230: verify that async converter path never uses sync llm.call()."""
from unittest.mock import AsyncMock
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = True
llm.acall = AsyncMock(return_value='{"name": "Test", "age": 25}')
llm.call = Mock(side_effect=AssertionError("sync llm.call() should not be called in async path"))
converter = Converter(
llm=llm,
text="Name: Test, Age: 25",
model=SimpleModel,
instructions="Convert this text.",
)
# ato_pydantic should use acall, never call
output = await converter.ato_pydantic()
assert isinstance(output, SimpleModel)
assert output.name == "Test"
assert output.age == 25
llm.acall.assert_called_once()
llm.call.assert_not_called()
@pytest.mark.asyncio
async def test_async_converter_json_does_not_call_sync_llm_call() -> None:
"""Verify ato_json for non-function-calling LLMs never uses sync llm.call()."""
from unittest.mock import AsyncMock
llm = Mock(spec=LLM)
llm.supports_function_calling.return_value = False
llm.acall = AsyncMock(return_value='{"name": "Test", "age": 25}')
llm.call = Mock(side_effect=AssertionError("sync llm.call() should not be called in async path"))
converter = Converter(
llm=llm,
text="Name: Test, Age: 25",
model=SimpleModel,
instructions="Convert this text.",
)
output = await converter.ato_json()
assert isinstance(output, str)
llm.acall.assert_called_once()
llm.call.assert_not_called()