Show typed tool output to the agent as JSON

Tools with an `output_schema` returned a Python repr to the agent
instead of clean JSON. Send every tool result through
`format_output_for_agent` so the agent reads valid JSON, across all
executors and `ToolUsage`.

The cache still stores the raw result, so cache callbacks keep getting
the original typed object.
This commit is contained in:
Vinicius Brasil
2026-06-18 21:04:13 -07:00
parent ba7533ed9d
commit 267b519896
9 changed files with 219 additions and 72 deletions

View File

@@ -907,19 +907,29 @@ class CrewAgentExecutor(BaseAgentExecutor):
):
max_usage_reached = True
structured_tool: CrewStructuredTool | None = None
if original_tool is not None:
for structured in self.tools or []:
if getattr(structured, "_original_tool", None) is original_tool:
structured_tool = structured
break
if structured_tool is None:
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
output_tool = original_tool or structured_tool
from_cache = False
result: str = "Tool not found"
input_str = json.dumps(args_dict) if args_dict else ""
if self.tools_handler and self.tools_handler.cache:
if self.tools_handler and self.tools_handler.cache and output_tool is not None:
cached_result = self.tools_handler.cache.read(
tool=func_name, input=input_str
)
if cached_result is not None:
result = (
str(cached_result)
if not isinstance(cached_result, str)
else cached_result
)
result = output_tool.format_output_for_agent(cached_result)
from_cache = True
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
@@ -938,18 +948,6 @@ class CrewAgentExecutor(BaseAgentExecutor):
track_delegation_if_needed(func_name, args_dict or {}, self.task)
structured_tool: CrewStructuredTool | None = None
if original_tool is not None:
for structured in self.tools or []:
if getattr(structured, "_original_tool", None) is original_tool:
structured_tool = structured
break
if structured_tool is None:
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
@@ -977,7 +975,11 @@ class CrewAgentExecutor(BaseAgentExecutor):
result = f"Tool execution blocked by hook. Tool: {func_name}"
elif max_usage_reached and original_tool:
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
elif not from_cache and func_name in available_functions:
elif (
not from_cache
and func_name in available_functions
and output_tool is not None
):
try:
raw_result = available_functions[func_name](**(args_dict or {}))
@@ -996,9 +998,7 @@ class CrewAgentExecutor(BaseAgentExecutor):
tool=func_name, input=input_str, output=raw_result
)
result = (
str(raw_result) if not isinstance(raw_result, str) else raw_result
)
result = output_tool.format_output_for_agent(raw_result)
except Exception as e:
result = f"Error executing tool: {e}"
if self.task:

View File

@@ -1905,19 +1905,29 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
):
max_usage_reached = True
structured_tool: CrewStructuredTool | None = None
if original_tool is not None:
for structured in self.tools or []:
if getattr(structured, "_original_tool", None) is original_tool:
structured_tool = structured
break
if structured_tool is None:
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
output_tool = original_tool or structured_tool
# Check cache before executing
from_cache = False
input_str = json.dumps(args_dict) if args_dict else ""
if self.tools_handler and self.tools_handler.cache:
if self.tools_handler and self.tools_handler.cache and output_tool is not None:
cached_result = self.tools_handler.cache.read(
tool=func_name, input=input_str
)
if cached_result is not None:
result = (
str(cached_result)
if not isinstance(cached_result, str)
else cached_result
)
result = output_tool.format_output_for_agent(cached_result)
from_cache = True
# Emit tool usage started event
@@ -1936,18 +1946,6 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
track_delegation_if_needed(func_name, args_dict, self.task)
structured_tool: CrewStructuredTool | None = None
if original_tool is not None:
for structured in self.tools or []:
if getattr(structured, "_original_tool", None) is original_tool:
structured_tool = structured
break
if structured_tool is None:
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
@@ -1973,7 +1971,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
if hook_blocked:
result = f"Tool execution blocked by hook. Tool: {func_name}"
elif not from_cache and not max_usage_reached:
elif not from_cache and not max_usage_reached and output_tool is not None:
result = "Tool not found"
if func_name in self._available_functions:
try:
@@ -1992,12 +1990,7 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
tool=func_name, input=input_str, output=raw_result
)
# Convert to string for message
result = (
str(raw_result)
if not isinstance(raw_result, str)
else raw_result
)
result = output_tool.format_output_for_agent(raw_result)
except Exception as e:
result = f"Error executing tool: {e}"
if self.task:

View File

@@ -359,7 +359,9 @@ class ToolUsage:
tool_name=sanitize_tool_name(tool.name),
attempts=self._run_attempts,
)
result = self._format_result(result=result)
result = self._format_result(
result=tool.format_output_for_agent(result)
)
data = {
"result": result,
"tool_name": sanitize_tool_name(tool.name),
@@ -430,7 +432,9 @@ class ToolUsage:
self.task.increment_tools_errors()
should_retry = True
else:
result = self._format_result(result=result)
result = self._format_result(
result=tool.format_output_for_agent(result)
)
finally:
if started_event_emitted and not error_event_emitted:
@@ -590,7 +594,9 @@ class ToolUsage:
tool_name=sanitize_tool_name(tool.name),
attempts=self._run_attempts,
)
result = self._format_result(result=result)
result = self._format_result(
result=tool.format_output_for_agent(result)
)
data = {
"result": result,
"tool_name": sanitize_tool_name(tool.name),
@@ -661,7 +667,9 @@ class ToolUsage:
self.task.increment_tools_errors()
should_retry = True
else:
result = self._format_result(result=result)
result = self._format_result(
result=tool.format_output_for_agent(result)
)
finally:
if started_event_emitted and not error_event_emitted:

View File

@@ -1456,18 +1456,22 @@ def execute_single_native_tool_call(
original_tool = tool
break
structured_tool: CrewStructuredTool | None = None
for structured in structured_tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
output_tool = original_tool or structured_tool
from_cache = False
input_str = json.dumps(args_dict) if args_dict else ""
result = "Tool not found"
if tools_handler and tools_handler.cache:
if tools_handler and tools_handler.cache and output_tool is not None:
cached_result = tools_handler.cache.read(tool=func_name, input=input_str)
if cached_result is not None:
result = (
str(cached_result)
if not isinstance(cached_result, str)
else cached_result
)
result = output_tool.format_output_for_agent(cached_result)
from_cache = True
started_at = datetime.now()
@@ -1486,12 +1490,6 @@ def execute_single_native_tool_call(
track_delegation_if_needed(func_name, args_dict, task)
structured_tool: CrewStructuredTool | None = None
for structured in structured_tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
@@ -1513,7 +1511,7 @@ def execute_single_native_tool_call(
if hook_blocked:
result = f"Tool execution blocked by hook. Tool: {func_name}"
elif not from_cache:
if func_name in available_functions:
if func_name in available_functions and output_tool is not None:
try:
tool_func = available_functions[func_name]
raw_result = tool_func(**args_dict)
@@ -1529,9 +1527,7 @@ def execute_single_native_tool_call(
tool=func_name, input=input_str, output=raw_result
)
result = (
str(raw_result) if not isinstance(raw_result, str) else raw_result
)
result = output_tool.format_output_for_agent(raw_result)
except Exception as e:
result = f"Error executing tool: {e}"
if task:

View File

@@ -7,6 +7,7 @@ when the LLM supports it, across multiple providers.
from __future__ import annotations
from collections.abc import Generator
import json
import os
import threading
import time
@@ -1197,6 +1198,35 @@ class TestNativeToolCallingJsonParseError:
assert result["result"] == "ran: print(1)"
def test_typed_output_is_json_agent_text(self) -> None:
class SearchOutput(BaseModel):
query: str
score: float
class TypedSearchTool(BaseTool):
name: str = "typed_search"
description: str = "Search for information"
output_schema: type[BaseModel] = SearchOutput
def _run(self, query: str) -> SearchOutput:
return SearchOutput(query=query, score=0.8)
tool = TypedSearchTool()
executor = self._make_executor([tool])
from crewai.utilities.agent_utils import convert_tools_to_openai_schema
_, available_functions, _ = convert_tools_to_openai_schema([tool])
result = executor._execute_single_native_tool_call(
call_id="call_typed",
func_name="typed_search",
func_args='{"query": "crew"}',
available_functions=available_functions,
)
assert json.loads(result["result"]) == {"query": "crew", "score": 0.8}
def test_native_tool_loop_falls_back_when_provider_rejects_tools(self) -> None:
"""Unsupported native tools errors should continue through ReAct."""

View File

@@ -422,8 +422,6 @@ def _make_root_decorator_tool() -> BaseTool:
class TestToolOutputSchema:
"""Tests for typed tool output behavior."""
@pytest.mark.parametrize(
("tool_cls", "expected_raw", "expected_agent_payload"),
[

View File

@@ -149,7 +149,6 @@ def test_from_function_returns_raw_result_and_json_agent_text(
expected_raw,
expected_agent_payload,
):
"""Typed structured tools return raw values and format JSON for the agent."""
kwargs = {"output_schema": output_schema} if output_schema is not None else {}
tool = CrewStructuredTool.from_function(
func=func,
@@ -166,7 +165,6 @@ def test_from_function_returns_raw_result_and_json_agent_text(
def test_from_function_does_not_infer_non_pydantic_output_schema():
"""Non-Pydantic return annotations use the plain string formatter."""
tool = CrewStructuredTool.from_function(
func=_build_plain_structured_value,
name="build_value",
@@ -179,7 +177,6 @@ def test_from_function_does_not_infer_non_pydantic_output_schema():
def test_invalid_typed_output_warns_and_uses_string_agent_text():
"""Invalid structured output leaves the raw result unchanged."""
def build_value(value: str) -> dict[str, object]:
"""Build a value."""
return {"value": value, "count": "wrong"}

View File

@@ -1,4 +1,5 @@
import datetime
from collections.abc import Callable
import json
import random
import threading
@@ -15,6 +16,7 @@ from crewai.events.types.tool_usage_events import (
ToolValidateInputErrorEvent,
)
from crewai.tools import BaseTool
from crewai.tools.tool_calling import ToolCalling
from crewai.tools.tool_usage import ToolUsage
from pydantic import BaseModel, Field
import pytest
@@ -38,6 +40,19 @@ class RandomNumberTool(BaseTool):
return random.randint(min_value, max_value) # noqa: S311
class SearchOutput(BaseModel):
query: str
score: float
class TypedSearchTool(BaseTool):
name: str = "typed_search"
description: str = "Search for a query"
def _run(self, query: str) -> SearchOutput:
return SearchOutput(query=query, score=0.7)
# Example agent and task
example_agent = Agent(
role="Number Generator",
@@ -117,6 +132,68 @@ def test_tool_usage_render():
assert '"description": "The maximum value of the range (inclusive)"' in rendered
def test_tool_usage_returns_json_agent_text_for_typed_output():
tool = TypedSearchTool().to_structured_tool()
tool_usage = ToolUsage(
tools_handler=None,
tools=[tool],
task=None,
function_calling_llm=MagicMock(),
agent=None,
action=MagicMock(),
)
result = tool_usage.use(
calling=ToolCalling(
tool_name="typed_search",
arguments={"query": "crew"},
),
tool_string='Action: typed_search\nAction Input: {"query": "crew"}',
)
assert json.loads(result) == {"query": "crew", "score": 0.7}
def test_tool_usage_cache_callback_receives_raw_typed_output():
raw_results: list[object] = []
def cache_result(_args: object, result: object) -> bool:
raw_results.append(result)
return True
class CacheAwareTypedSearchTool(TypedSearchTool):
cache_function: Callable = cache_result
tools_handler = MagicMock()
tools_handler.cache = None
tools_handler.last_used_tool = None
tool = CacheAwareTypedSearchTool().to_structured_tool()
tool_usage = ToolUsage(
tools_handler=tools_handler,
tools=[tool],
task=None,
function_calling_llm=MagicMock(),
agent=None,
action=MagicMock(),
)
result = tool_usage.use(
calling=ToolCalling(
tool_name="typed_search",
arguments={"query": "crew"},
),
tool_string='Action: typed_search\nAction Input: {"query": "crew"}',
)
assert json.loads(result) == {"query": "crew", "score": 0.7}
assert raw_results == [SearchOutput(query="crew", score=0.7)]
tools_handler.on_tool_use.assert_called_once()
assert tools_handler.on_tool_use.call_args.kwargs["output"] == SearchOutput(
query="crew",
score=0.7,
)
def test_validate_tool_input_booleans_and_none():
tool_usage = ToolUsage(
tools_handler=MagicMock(),

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import asyncio
import json
from typing import Any, Literal, Optional
from unittest.mock import AsyncMock, MagicMock, patch
@@ -1030,6 +1031,53 @@ class TestParseToolCallArgs:
class TestExecuteSingleNativeToolCall:
"""Tests for execute_single_native_tool_call."""
def test_typed_tool_output_is_json_agent_text(self) -> None:
from crewai.hooks.tool_hooks import (
clear_after_tool_call_hooks,
clear_before_tool_call_hooks,
)
clear_before_tool_call_hooks()
clear_after_tool_call_hooks()
class SearchOutput(BaseModel):
query: str
score: float
class TypedSearchTool(BaseTool):
name: str = "typed_search"
description: str = "Search for a query"
output_schema: type[BaseModel] = SearchOutput
def _run(self, query: str) -> SearchOutput:
return SearchOutput(query=query, score=0.9)
tool = TypedSearchTool()
tool_call = MagicMock()
tool_call.id = "call_1"
tool_call.function.name = "typed_search"
tool_call.function.arguments = '{"query": "crew"}'
result = execute_single_native_tool_call(
tool_call,
available_functions={"typed_search": tool._run},
original_tools=[tool],
structured_tools=[tool.to_structured_tool()],
tools_handler=None,
agent=None,
task=None,
crew=None,
event_source=MagicMock(),
printer=None,
verbose=False,
)
assert json.loads(result.result) == {"query": "crew", "score": 0.9}
assert json.loads(result.tool_message["content"]) == {
"query": "crew",
"score": 0.9,
}
def test_result_as_answer_false_on_tool_error(self) -> None:
"""When a tool with result_as_answer=True raises, result_as_answer must be False.