Files
crewAI/lib/crewai/tests/a2a/utils/test_delegation.py
Alex b626dd5ec9 fix: only fall back to polling when streaming is explicitly False
Addresses review: agents that leave streaming unset (None) should get the
default StreamingHandler, not polling. Only agents that explicitly set
streaming=False trigger the polling fallback.
2026-04-12 16:38:44 -07:00

198 lines
7.0 KiB
Python

"""Tests for A2A delegation utilities."""
from __future__ import annotations
from contextlib import asynccontextmanager, ExitStack
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from a2a.client import Client
from a2a.types import AgentCapabilities, AgentCard
from crewai.a2a.updates.polling.handler import PollingHandler
from crewai.a2a.updates.streaming.handler import StreamingHandler
from crewai.a2a.utils.delegation import get_handler, _aexecute_a2a_delegation_impl
from crewai.a2a.utils.transport import NegotiatedTransport
class TestGetHandler:
"""Tests for the get_handler helper."""
def test_returns_streaming_handler_when_config_is_none(self) -> None:
assert get_handler(None) is StreamingHandler
def test_returns_polling_handler_for_polling_config(self) -> None:
from crewai.a2a.updates import PollingConfig
assert get_handler(PollingConfig()) is PollingHandler
def _make_agent_card(streaming: bool | None) -> AgentCard:
"""Build a minimal AgentCard with the given streaming capability."""
capabilities = AgentCapabilities(streaming=streaming)
return AgentCard(
name="test-agent",
description="A test agent",
url="http://localhost:9999/",
version="1.0.0",
capabilities=capabilities,
defaultInputModes=["text/plain"],
defaultOutputModes=["text/plain"],
skills=[],
)
_TASK_RESULT = {"status": "completed", "result": "done", "history": []}
def _make_shared_patches(agent_card: AgentCard) -> list:
"""Return the common patches used across the delegation tests."""
mock_client = MagicMock(spec=Client)
@asynccontextmanager
async def _fake_client_ctx(*args: Any, **kwargs: Any):
yield mock_client
negotiated = NegotiatedTransport(
transport="JSONRPC",
url=agent_card.url,
source="server_card",
)
return [
patch(
"crewai.a2a.utils.delegation._afetch_agent_card_cached",
new=AsyncMock(return_value=agent_card),
),
patch("crewai.a2a.utils.delegation.validate_auth_against_agent_card"),
patch(
"crewai.a2a.utils.delegation.validate_required_extensions",
return_value=[],
),
patch(
"crewai.a2a.utils.delegation.negotiate_transport",
return_value=negotiated,
),
patch(
"crewai.a2a.utils.delegation.negotiate_content_types",
return_value=MagicMock(output_modes=None),
),
patch(
"crewai.a2a.utils.delegation._prepare_auth_headers",
new=AsyncMock(return_value=({}, None)),
),
patch("crewai.a2a.utils.delegation.crewai_event_bus"),
patch(
"crewai.a2a.utils.delegation._create_a2a_client",
side_effect=_fake_client_ctx,
),
]
async def _call_impl(agent_card: AgentCard, updates=None) -> None:
await _aexecute_a2a_delegation_impl(
endpoint="http://localhost:9999/",
auth=None,
timeout=30,
task_description="test task",
context=None,
context_id=None,
task_id=None,
reference_task_ids=None,
metadata=None,
extensions=None,
conversation_history=[],
is_multiturn=False,
turn_number=1,
agent_branch=None,
agent_id=None,
agent_role=None,
response_model=None,
updates=updates,
)
class TestStreamingFallback:
"""Tests that the delegation respects the agent card's streaming capability."""
@pytest.mark.asyncio
async def test_uses_polling_when_agent_card_says_no_streaming(self) -> None:
"""When streaming=False and updates is None, PollingHandler should be used."""
agent_card = _make_agent_card(streaming=False)
with ExitStack() as stack:
for p in _make_shared_patches(agent_card):
stack.enter_context(p)
mock_polling = stack.enter_context(
patch.object(PollingHandler, "execute", new=AsyncMock(return_value=_TASK_RESULT))
)
mock_streaming = stack.enter_context(
patch.object(StreamingHandler, "execute", new=AsyncMock(return_value=_TASK_RESULT))
)
await _call_impl(agent_card, updates=None)
mock_polling.assert_called_once()
mock_streaming.assert_not_called()
@pytest.mark.asyncio
async def test_uses_streaming_when_agent_card_says_streaming(self) -> None:
"""When streaming=True and updates is None, StreamingHandler should be used."""
agent_card = _make_agent_card(streaming=True)
with ExitStack() as stack:
for p in _make_shared_patches(agent_card):
stack.enter_context(p)
mock_polling = stack.enter_context(
patch.object(PollingHandler, "execute", new=AsyncMock(return_value=_TASK_RESULT))
)
mock_streaming = stack.enter_context(
patch.object(StreamingHandler, "execute", new=AsyncMock(return_value=_TASK_RESULT))
)
await _call_impl(agent_card, updates=None)
mock_streaming.assert_called_once()
mock_polling.assert_not_called()
@pytest.mark.asyncio
async def test_uses_streaming_when_agent_card_streaming_is_none(self) -> None:
"""When streaming is unset (None) and updates is None, StreamingHandler should be used."""
agent_card = _make_agent_card(streaming=None)
with ExitStack() as stack:
for p in _make_shared_patches(agent_card):
stack.enter_context(p)
mock_polling = stack.enter_context(
patch.object(PollingHandler, "execute", new=AsyncMock(return_value=_TASK_RESULT))
)
mock_streaming = stack.enter_context(
patch.object(StreamingHandler, "execute", new=AsyncMock(return_value=_TASK_RESULT))
)
await _call_impl(agent_card, updates=None)
mock_streaming.assert_called_once()
mock_polling.assert_not_called()
@pytest.mark.asyncio
async def test_explicit_streaming_config_overrides_agent_card(self) -> None:
"""Explicitly passing StreamingConfig keeps StreamingHandler even when agent card says no streaming."""
from crewai.a2a.updates.streaming.config import StreamingConfig
agent_card = _make_agent_card(streaming=False)
with ExitStack() as stack:
for p in _make_shared_patches(agent_card):
stack.enter_context(p)
mock_polling = stack.enter_context(
patch.object(PollingHandler, "execute", new=AsyncMock(return_value=_TASK_RESULT))
)
mock_streaming = stack.enter_context(
patch.object(StreamingHandler, "execute", new=AsyncMock(return_value=_TASK_RESULT))
)
await _call_impl(agent_card, updates=StreamingConfig())
# explicit config overrides agent card; streaming handler is used
mock_streaming.assert_called_once()
mock_polling.assert_not_called()