From 6fe34644abfc8faf72454394976a3bed5527f5be Mon Sep 17 00:00:00 2001 From: Greyson Lalonde Date: Fri, 8 May 2026 21:33:24 +0800 Subject: [PATCH] refactor(a2a): use tool calling for delegation instead of structured output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each remote A2A agent is now exposed to the local LLM as a BaseTool (delegate_to_); the local agent's tool-call loop drives multi-turn delegation. The Literal-constrained AgentResponse model and the explicit per-turn re-prompting loop are gone. Closes #3897. The original failure mode — Pydantic literal_error when skill.id != endpoint URL, and Gemini flash-lite hallucinating out-of-enum values — is structurally impossible: provider-side tool-call validation enforces the tool name, and there's no competing identifier. --- lib/crewai/src/crewai/a2a/templates.py | 51 +- lib/crewai/src/crewai/a2a/tools.py | 394 ++++ lib/crewai/src/crewai/a2a/types.py | 11 - .../src/crewai/a2a/utils/response_model.py | 82 +- lib/crewai/src/crewai/a2a/wrapper.py | 1728 +++-------------- lib/crewai/src/crewai/agent/core.py | 16 +- lib/crewai/src/crewai/lite_agent.py | 5 +- .../test_a2a_trust_completion_status.py | 196 +- 8 files changed, 752 insertions(+), 1731 deletions(-) create mode 100644 lib/crewai/src/crewai/a2a/tools.py diff --git a/lib/crewai/src/crewai/a2a/templates.py b/lib/crewai/src/crewai/a2a/templates.py index 16f0c479e..893d0dfd9 100644 --- a/lib/crewai/src/crewai/a2a/templates.py +++ b/lib/crewai/src/crewai/a2a/templates.py @@ -1,23 +1,20 @@ -"""String templates for A2A (Agent-to-Agent) protocol messaging and status.""" +"""String templates for A2A (Agent-to-Agent) delegation prompts.""" from string import Template from typing import Final AVAILABLE_AGENTS_TEMPLATE: Final[Template] = Template( - "\n\n $available_a2a_agents\n\n" -) -PREVIOUS_A2A_CONVERSATION_TEMPLATE: Final[Template] = Template( - "\n\n" - " $previous_a2a_conversation" - "\n\n" -) -CONVERSATION_TURN_INFO_TEMPLATE: Final[Template] = Template( - "\n\n" - ' turn="$turn_count"\n' - ' max_turns="$max_turns"\n' - " $warning" - "\n\n" + "\n\n" + "You can delegate to remote agents using the delegate_to_* tools below. " + "Each tool's description lists the remote agent's capabilities — call the " + "tool whose capabilities best match the task. Pass the question or sub-task " + "to the remote agent via the tool's `message` argument; the tool returns " + "the remote agent's response, which you should incorporate into your final " + "answer. If the available agents are not a good fit, answer directly " + "without calling a delegation tool.\n\n" + " $available_a2a_agents" + "\n\n" ) UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template( "\n\n" @@ -27,29 +24,3 @@ UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template( " $unavailable_agents" "\n\n" ) -REMOTE_AGENT_COMPLETED_NOTICE: Final[str] = """ - -STATUS: COMPLETED -The remote agent has finished processing your request. Their response is in the conversation history above. -You MUST now: -1. Extract the answer from the conversation history -2. Set is_a2a=false -3. Return the answer as your final message -DO NOT send another request - the task is already done. - -""" - -REMOTE_AGENT_RESPONSE_NOTICE: Final[str] = """ - -STATUS: RESPONSE_RECEIVED -The remote agent has responded. Their response is in the conversation history above. - -You MUST now: -1. Set is_a2a=false (the remote task is complete and cannot receive more messages) -2. Provide YOUR OWN response to the original task based on the information received - -IMPORTANT: Your response should be addressed to the USER who gave you the original task. -Report what the remote agent told you in THIRD PERSON (e.g., "The remote agent said..." or "I learned that..."). -Do NOT address the remote agent directly or use "you" to refer to them. - -""" diff --git a/lib/crewai/src/crewai/a2a/tools.py b/lib/crewai/src/crewai/a2a/tools.py new file mode 100644 index 000000000..200eeb1e8 --- /dev/null +++ b/lib/crewai/src/crewai/a2a/tools.py @@ -0,0 +1,394 @@ +"""Tool-based A2A delegation. + +Each remote A2A agent is exposed to the local LLM as a BaseTool. The local +agent's normal tool-call loop drives multi-turn delegation: each tool call is +one turn against the remote agent. Per-endpoint conversation state lives in +``A2ADelegationState`` and is shared across the tools built for a single task. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any + +from a2a.types import Role, TaskState +from pydantic import BaseModel, Field, PrivateAttr + +from crewai.a2a.config import A2AClientConfig, A2AConfig +from crewai.a2a.extensions.base import ( + A2AExtension, + ConversationState, + ExtensionRegistry, +) +from crewai.a2a.task_helpers import TaskStateResult +from crewai.a2a.utils.delegation import aexecute_a2a_delegation, execute_a2a_delegation +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.a2a_events import A2AConversationCompletedEvent +from crewai.tools.base_tool import BaseTool +from crewai.utilities.string_utils import sanitize_tool_name + + +if TYPE_CHECKING: + from a2a.types import AgentCard, Message + + from crewai.task import Task + + +_DELEGATE_PREFIX = "delegate_to_" + + +@dataclass +class _EndpointState: + """Mutable per-endpoint conversation state across tool calls.""" + + conversation_history: list[Message] = field(default_factory=list) + context_id: str | None = None + task_id: str | None = None + reference_task_ids: list[str] = field(default_factory=list) + turn_count: int = 0 + + +@dataclass +class A2ADelegationState: + """State shared across all A2A delegation tools for a single task execution.""" + + agent: Any + task: Task + extension_registry: ExtensionRegistry | None = None + _per_endpoint: dict[str, _EndpointState] = field(default_factory=dict) + + def _state_for(self, endpoint: str) -> _EndpointState: + return self._per_endpoint.setdefault(endpoint, _EndpointState()) + + def _initial_ids_from_task(self, state: _EndpointState) -> None: + if state.turn_count > 0: + return + task_config = self.task.config or {} + if state.context_id is None: + state.context_id = task_config.get("context_id") + if state.task_id is None: + state.task_id = task_config.get("task_id") + if not state.reference_task_ids: + state.reference_task_ids = list(task_config.get("reference_task_ids", [])) + + def delegate( + self, + config: A2AConfig | A2AClientConfig, + agent_card: AgentCard | None, + message: str, + ) -> str: + """Run one delegation turn against ``config.endpoint``. + + Returns the remote agent's response text, suitable for handing back to + the local LLM as a tool result. + """ + return _run_delegation(self, config, agent_card, message, sync=True) + + async def adelegate( + self, + config: A2AConfig | A2AClientConfig, + agent_card: AgentCard | None, + message: str, + ) -> str: + """Async variant of :meth:`delegate`.""" + return await _run_delegation_async(self, config, agent_card, message) + + +class _A2ADelegationArgs(BaseModel): + """Argument schema for A2A delegation tools.""" + + message: str = Field( + ..., + description=( + "The question or task to send to the remote agent. Be specific and " + "self-contained: the remote agent does not see your other tools or " + "your prior reasoning." + ), + ) + + +class A2ADelegationTool(BaseTool): + """BaseTool that delegates one turn of conversation to a remote A2A agent. + + Each instance is bound to a specific A2A endpoint via ``_config``. Calling + ``_run`` or ``_arun`` advances that endpoint's conversation by one turn and + returns the remote agent's response text. + """ + + args_schema: type[BaseModel] = _A2ADelegationArgs + + _config: A2AConfig | A2AClientConfig = PrivateAttr() + _agent_card: AgentCard | None = PrivateAttr(default=None) + _state: A2ADelegationState = PrivateAttr() + + def _run(self, message: str) -> str: + return self._state.delegate(self._config, self._agent_card, message) + + async def _arun(self, message: str) -> str: + return await self._state.adelegate(self._config, self._agent_card, message) + + +def build_a2a_tools( + a2a_agents: list[A2AConfig | A2AClientConfig], + agent_cards: dict[str, AgentCard], + state: A2ADelegationState, +) -> list[BaseTool]: + """Build one ``A2ADelegationTool`` per available A2A agent. + + Tool names collide-disambiguate with a numeric suffix; agents whose cards + failed to fetch are skipped. + """ + tools: list[BaseTool] = [] + used_names: set[str] = set() + for config in a2a_agents: + card = agent_cards.get(config.endpoint) + if card is None: + continue + name = _build_tool_name(card.name or "remote_agent", used_names) + used_names.add(name) + tool = A2ADelegationTool( + name=name, + description=_build_tool_description(card), + max_usage_count=config.max_turns, + ) + tool._config = config + tool._agent_card = card + tool._state = state + tools.append(tool) + return tools + + +def _build_tool_name(card_name: str, used: set[str]) -> str: + base = sanitize_tool_name(f"{_DELEGATE_PREFIX}{card_name}") + if base not in used: + return base + for i in range(2, 1000): + candidate = sanitize_tool_name(f"{base}_{i}") + if candidate not in used: + return candidate + raise ValueError(f"Could not generate unique tool name for {card_name!r}") + + +def _build_tool_description(card: AgentCard) -> str: + lines: list[str] = [f"Delegate a task to the remote A2A agent {card.name!r}."] + if card.description: + lines.append(card.description.strip()) + if card.skills: + skill_names = ", ".join(s.name for s in card.skills if s.name) + if skill_names: + lines.append(f"Capabilities: {skill_names}.") + lines.append( + "Use this tool only when the question matches the agent's capabilities. " + "After receiving a response, prefer answering directly unless you need " + "another round-trip." + ) + return "\n".join(lines) + + +def _run_delegation( + state: A2ADelegationState, + config: A2AConfig | A2AClientConfig, + agent_card: AgentCard | None, + message: str, + *, + sync: bool, +) -> str: + endpoint_state = state._state_for(config.endpoint) + state._initial_ids_from_task(endpoint_state) + + extension_states = _extract_extension_states(state, endpoint_state) + metadata = _merged_metadata(state, endpoint_state, extension_states) + agent_branch, accepted_output_modes = _turn_context(config) + + a2a_result = execute_a2a_delegation( + endpoint=config.endpoint, + auth=config.auth, + timeout=config.timeout, + task_description=message, + context_id=endpoint_state.context_id, + task_id=endpoint_state.task_id, + reference_task_ids=endpoint_state.reference_task_ids, + metadata=metadata or None, + extensions=(state.task.config or {}).get("extensions"), + conversation_history=endpoint_state.conversation_history, + agent_id=config.endpoint, + agent_role=Role.user, + agent_branch=agent_branch, + response_model=config.response_model, + turn_number=endpoint_state.turn_count + 1, + updates=config.updates, + transport=config.transport, + from_task=state.task, + from_agent=state.agent, + client_extensions=getattr(config, "extensions", None), + accepted_output_modes=accepted_output_modes, + input_files=state.task.input_files, + ) + return _finalize_turn( + state, endpoint_state, config, agent_card, a2a_result, extension_states + ) + + +async def _run_delegation_async( + state: A2ADelegationState, + config: A2AConfig | A2AClientConfig, + agent_card: AgentCard | None, + message: str, +) -> str: + endpoint_state = state._state_for(config.endpoint) + state._initial_ids_from_task(endpoint_state) + + extension_states = _extract_extension_states(state, endpoint_state) + metadata = _merged_metadata(state, endpoint_state, extension_states) + agent_branch, accepted_output_modes = _turn_context(config) + + a2a_result = await aexecute_a2a_delegation( + endpoint=config.endpoint, + auth=config.auth, + timeout=config.timeout, + task_description=message, + context_id=endpoint_state.context_id, + task_id=endpoint_state.task_id, + reference_task_ids=endpoint_state.reference_task_ids, + metadata=metadata or None, + extensions=(state.task.config or {}).get("extensions"), + conversation_history=endpoint_state.conversation_history, + agent_id=config.endpoint, + agent_role=Role.user, + agent_branch=agent_branch, + response_model=config.response_model, + turn_number=endpoint_state.turn_count + 1, + updates=config.updates, + transport=config.transport, + from_task=state.task, + from_agent=state.agent, + client_extensions=getattr(config, "extensions", None), + accepted_output_modes=accepted_output_modes, + input_files=state.task.input_files, + ) + return _finalize_turn( + state, endpoint_state, config, agent_card, a2a_result, extension_states + ) + + +def _extract_extension_states( + state: A2ADelegationState, + endpoint_state: _EndpointState, +) -> dict[type[A2AExtension], ConversationState]: + if state.extension_registry and endpoint_state.conversation_history: + return state.extension_registry.extract_all_states( + endpoint_state.conversation_history + ) + return {} + + +def _merged_metadata( + state: A2ADelegationState, + endpoint_state: _EndpointState, + extension_states: dict[type[A2AExtension], ConversationState], +) -> dict[str, Any]: + task_config = state.task.config or {} + metadata: dict[str, Any] = dict(task_config.get("metadata") or {}) + if state.extension_registry and extension_states: + metadata.update(state.extension_registry.prepare_all_metadata(extension_states)) + return metadata + + +def _turn_context( + config: A2AConfig | A2AClientConfig, +) -> tuple[Any | None, list[str] | None]: + console_formatter = getattr(crewai_event_bus, "_console", None) + agent_branch = None + if console_formatter: + agent_branch = getattr( + console_formatter, "current_agent_branch", None + ) or getattr(console_formatter, "current_task_branch", None) + + accepted_output_modes = None + if isinstance(config, A2AClientConfig): + accepted_output_modes = config.accepted_output_modes + return agent_branch, accepted_output_modes + + +def _finalize_turn( + state: A2ADelegationState, + endpoint_state: _EndpointState, + config: A2AConfig | A2AClientConfig, + agent_card: AgentCard | None, + a2a_result: TaskStateResult, + extension_states: dict[type[A2AExtension], ConversationState], +) -> str: + endpoint_state.conversation_history = list(a2a_result.get("history", [])) + if endpoint_state.conversation_history: + latest = endpoint_state.conversation_history[-1] + if latest.task_id is not None: + endpoint_state.task_id = latest.task_id + if latest.context_id is not None: + endpoint_state.context_id = latest.context_id + + endpoint_state.turn_count += 1 + status = a2a_result.get("status") + + if status == TaskState.completed: + if ( + endpoint_state.task_id is not None + and endpoint_state.task_id not in endpoint_state.reference_task_ids + ): + endpoint_state.reference_task_ids.append(endpoint_state.task_id) + if state.task.config is None: + state.task.config = {} + state.task.config["reference_task_ids"] = list( + endpoint_state.reference_task_ids + ) + endpoint_state.task_id = None + + result_text = str(a2a_result.get("result", "")) + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="completed", + final_result=result_text, + error=None, + total_turns=endpoint_state.turn_count, + from_task=state.task, + from_agent=state.agent, + endpoint=config.endpoint, + a2a_agent_name=agent_card.name if agent_card else None, + agent_card=agent_card.model_dump() if agent_card else None, + ), + ) + return _apply_response_extensions(state, result_text, extension_states) + + if status == TaskState.input_required: + result_text = str(a2a_result.get("result", "")) + return _apply_response_extensions(state, result_text, extension_states) + + error_msg = a2a_result.get("error", "Unknown error") + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="failed", + final_result=None, + error=error_msg, + total_turns=endpoint_state.turn_count, + from_task=state.task, + from_agent=state.agent, + endpoint=config.endpoint, + a2a_agent_name=agent_card.name if agent_card else None, + agent_card=agent_card.model_dump() if agent_card else None, + ), + ) + return f"Remote agent error: {error_msg}" + + +def _apply_response_extensions( + state: A2ADelegationState, + response_text: str, + extension_states: dict[type[A2AExtension], ConversationState], +) -> str: + if not state.extension_registry: + return response_text + processed = state.extension_registry.process_response_with_all( + response_text, extension_states + ) + return processed if isinstance(processed, str) else str(processed) diff --git a/lib/crewai/src/crewai/a2a/types.py b/lib/crewai/src/crewai/a2a/types.py index 5b06f8b8b..a5bb63149 100644 --- a/lib/crewai/src/crewai/a2a/types.py +++ b/lib/crewai/src/crewai/a2a/types.py @@ -6,8 +6,6 @@ from typing import ( Annotated, Any, Literal, - Protocol, - runtime_checkable, ) from pydantic import BeforeValidator, HttpUrl, TypeAdapter @@ -57,15 +55,6 @@ Url = Annotated[ ] -@runtime_checkable -class AgentResponseProtocol(Protocol): - """Protocol for the dynamically created AgentResponse model.""" - - a2a_ids: tuple[str, ...] - message: str - is_a2a: bool - - class PartsMetadataDict(TypedDict, total=False): """Metadata for A2A message parts. diff --git a/lib/crewai/src/crewai/a2a/utils/response_model.py b/lib/crewai/src/crewai/a2a/utils/response_model.py index 1359e2f10..b95955143 100644 --- a/lib/crewai/src/crewai/a2a/utils/response_model.py +++ b/lib/crewai/src/crewai/a2a/utils/response_model.py @@ -1,75 +1,25 @@ -"""Response model utilities for A2A agent interactions.""" +"""Helpers for extracting A2A client configurations.""" from __future__ import annotations from typing import TypeAlias -from pydantic import BaseModel, Field, create_model - from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig -from crewai.types.utils import create_literals_from_strings A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig -def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel] | None: - """Create a dynamic AgentResponse model with Literal types for agent IDs. - - Args: - agent_ids: List of available A2A agent IDs. - - Returns: - Dynamically created Pydantic model with Literal-constrained a2a_ids field, - or None if agent_ids is empty. - """ - if not agent_ids: - return None - - DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806 - - return create_model( - "AgentResponse", - a2a_ids=( - tuple[DynamicLiteral, ...], # type: ignore[valid-type] - Field( - default_factory=tuple, - max_length=len(agent_ids), - description="A2A agent IDs to delegate to.", - ), - ), - message=( - str, - Field( - description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation." - ), - ), - is_a2a=( - bool, - Field( - description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately." - ), - ), - __base__=BaseModel, - ) - - -def extract_a2a_agent_ids_from_config( +def extract_a2a_client_configs( a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None, -) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]: - """Extract A2A agent IDs from A2A configuration. +) -> list[A2AClientConfigTypes]: + """Return the client-side A2A configs from a possibly-mixed config list. - Filters out A2AServerConfig since it doesn't have an endpoint for delegation. - - Args: - a2a_config: A2A configuration (any type). - - Returns: - Tuple of client A2A configs list and agent endpoint IDs. + Filters out :class:`A2AServerConfig`, which has no endpoint to delegate to. """ if a2a_config is None: - return [], () + return [] configs: list[A2AConfigTypes] if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)): @@ -77,24 +27,6 @@ def extract_a2a_agent_ids_from_config( else: configs = a2a_config - client_configs: list[A2AClientConfigTypes] = [ + return [ config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig)) ] - - return client_configs, tuple(config.endpoint for config in client_configs) - - -def get_a2a_agents_and_response_model( - a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None, -) -> tuple[list[A2AClientConfigTypes], type[BaseModel] | None]: - """Get A2A agent configs and response model. - - Args: - a2a_config: A2A configuration (any type). - - Returns: - Tuple of client A2A configs and response model. - """ - a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config) - - return a2a_agents, create_agent_response_model(agent_ids) diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 0ec7fc6ae..03f60993f 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -1,108 +1,51 @@ """A2A agent wrapping logic for metaclass integration. -Wraps agent classes with A2A delegation capabilities. +Wraps agent classes with A2A delegation capabilities. Each remote A2A agent +is exposed to the local LLM as a BaseTool; the local agent's tool-call loop +drives multi-turn delegation. """ from __future__ import annotations import asyncio -from collections.abc import Callable, Coroutine, Mapping +from collections.abc import Callable, Coroutine, Iterator, Mapping from concurrent.futures import ThreadPoolExecutor, as_completed +import contextlib import contextvars from functools import wraps import json from types import MethodType -from typing import TYPE_CHECKING, Any, NamedTuple - -from a2a.types import Role, TaskState -from pydantic import BaseModel, ValidationError +from typing import TYPE_CHECKING, Any from crewai.a2a.config import A2AClientConfig, A2AConfig -from crewai.a2a.extensions.base import ( - A2AExtension, - ConversationState, - ExtensionRegistry, -) -from crewai.a2a.task_helpers import TaskStateResult +from crewai.a2a.extensions.base import ExtensionRegistry from crewai.a2a.templates import ( AVAILABLE_AGENTS_TEMPLATE, - CONVERSATION_TURN_INFO_TEMPLATE, - PREVIOUS_A2A_CONVERSATION_TEMPLATE, - REMOTE_AGENT_RESPONSE_NOTICE, UNAVAILABLE_AGENTS_NOTICE_TEMPLATE, ) -from crewai.a2a.types import AgentResponseProtocol +from crewai.a2a.tools import A2ADelegationState, build_a2a_tools from crewai.a2a.utils.agent_card import ( afetch_agent_card, fetch_agent_card, inject_a2a_server_methods, ) -from crewai.a2a.utils.delegation import ( - aexecute_a2a_delegation, - execute_a2a_delegation, -) -from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model -from crewai.events.event_bus import crewai_event_bus -from crewai.events.types.a2a_events import ( - A2AConversationCompletedEvent, - A2AMessageSentEvent, -) +from crewai.a2a.utils.response_model import extract_a2a_client_configs from crewai.lite_agent_output import LiteAgentOutput from crewai.task import Task if TYPE_CHECKING: - from a2a.types import AgentCard, Message + from a2a.types import AgentCard from crewai.agent.core import Agent from crewai.tools.base_tool import BaseTool -class DelegationContext(NamedTuple): - """Context prepared for A2A delegation. - - Groups all the values needed to execute a delegation to a remote A2A agent. - """ - - a2a_agents: list[A2AConfig | A2AClientConfig] - agent_response_model: type[BaseModel] | None - current_request: str - agent_id: str - agent_config: A2AConfig | A2AClientConfig - context_id: str | None - task_id: str | None - metadata: dict[str, Any] | None - extensions: dict[str, Any] | None - reference_task_ids: list[str] - original_task_description: str - max_turns: int - - -class DelegationState(NamedTuple): - """Mutable state for A2A delegation loop. - - Groups values that may change during delegation turns. - """ - - current_request: str - context_id: str | None - task_id: str | None - reference_task_ids: list[str] - conversation_history: list[Message] - agent_card: AgentCard | None - agent_card_dict: dict[str, Any] | None - agent_name: str | None - - def wrap_agent_with_a2a_instance( agent: Agent, extension_registry: ExtensionRegistry | None = None ) -> None: """Wrap an agent instance's task execution and kickoff methods with A2A support. - This function modifies the agent instance by wrapping its execute_task, - aexecute_task, kickoff, and kickoff_async methods to add A2A delegation - capabilities. Should only be called when the agent has a2a configuration set. - Args: agent: The agent instance to wrap. extension_registry: Optional registry of A2A extensions. @@ -126,14 +69,13 @@ def wrap_agent_with_a2a_instance( if not self.a2a: return original_execute_task(self, task, context, tools) # type: ignore[no-any-return] - a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) + a2a_agents = extract_a2a_client_configs(self.a2a) return _execute_task_with_a2a( self=self, a2a_agents=a2a_agents, original_fn=original_execute_task, task=task, - agent_response_model=agent_response_model, context=context, tools=tools, extension_registry=extension_registry, @@ -150,14 +92,13 @@ def wrap_agent_with_a2a_instance( if not self.a2a: return await original_aexecute_task(self, task, context, tools) # type: ignore[no-any-return] - a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) + a2a_agents = extract_a2a_client_configs(self.a2a) return await _aexecute_task_with_a2a( self=self, a2a_agents=a2a_agents, original_fn=original_aexecute_task, task=task, - agent_response_model=agent_response_model, context=context, tools=tools, extension_registry=extension_registry, @@ -182,8 +123,7 @@ def wrap_agent_with_a2a_instance( if not self.a2a: return original_kickoff(self, messages, response_format, input_files) - a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) - + a2a_agents = extract_a2a_client_configs(self.a2a) if not a2a_agents: return original_kickoff(self, messages, response_format, input_files) @@ -194,7 +134,6 @@ def wrap_agent_with_a2a_instance( messages=messages, response_format=response_format, input_files=input_files, - agent_response_model=agent_response_model, extension_registry=extension_registry, ) @@ -211,8 +150,7 @@ def wrap_agent_with_a2a_instance( self, messages, response_format, input_files ) - a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) - + a2a_agents = extract_a2a_client_configs(self.a2a) if not a2a_agents: return await original_kickoff_async( self, messages, response_format, input_files @@ -225,7 +163,6 @@ def wrap_agent_with_a2a_instance( messages=messages, response_format=response_format, input_files=input_files, - agent_response_model=agent_response_model, extension_registry=extension_registry, ) @@ -240,14 +177,7 @@ def wrap_agent_with_a2a_instance( def _fetch_card_from_config( config: A2AConfig | A2AClientConfig, ) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]: - """Fetch agent card from A2A config. - - Args: - config: A2A configuration - - Returns: - Tuple of (config, card or exception) - """ + """Fetch an agent card synchronously, capturing any exception.""" try: card = fetch_agent_card( endpoint=config.endpoint, @@ -262,14 +192,7 @@ def _fetch_card_from_config( def _fetch_agent_cards_concurrently( a2a_agents: list[A2AConfig | A2AClientConfig], ) -> tuple[dict[str, AgentCard], dict[str, str]]: - """Fetch agent cards concurrently for multiple A2A agents. - - Args: - a2a_agents: List of A2A agent configurations - - Returns: - Tuple of (agent_cards dict, failed_agents dict mapping endpoint to error message) - """ + """Fetch agent cards concurrently for multiple A2A agents.""" agent_cards: dict[str, AgentCard] = {} failed_agents: dict[str, str] = {} @@ -299,1129 +222,10 @@ def _fetch_agent_cards_concurrently( return agent_cards, failed_agents -def _execute_task_with_a2a( - self: Agent, - a2a_agents: list[A2AConfig | A2AClientConfig], - original_fn: Callable[..., str], - task: Task, - agent_response_model: type[BaseModel] | None, - context: str | None, - tools: list[BaseTool] | None, - extension_registry: ExtensionRegistry, -) -> str: - """Wrap execute_task with A2A delegation logic. - - Args: - self: The agent instance - a2a_agents: Dictionary of A2A agent configurations - original_fn: The original execute_task method - task: The task to execute - context: Optional context for task execution - tools: Optional tools available to the agent - agent_response_model: Optional agent response model - extension_registry: Registry of A2A extensions - - Returns: - Task execution result (either from LLM or A2A agent) - """ - original_description: str = task.description - original_output_pydantic = task.output_pydantic - original_response_model = task.response_model - - agent_cards, failed_agents = _fetch_agent_cards_concurrently(a2a_agents) - - if not agent_cards and a2a_agents and failed_agents: - unavailable_agents_text = "" - for endpoint, error in failed_agents.items(): - unavailable_agents_text += f" - {endpoint}: {error}\n" - - notice = UNAVAILABLE_AGENTS_NOTICE_TEMPLATE.substitute( - unavailable_agents=unavailable_agents_text - ) - task.description = f"{original_description}{notice}" - - try: - return original_fn(self, task, context, tools) - finally: - task.description = original_description - - task.description, _, extension_states = _augment_prompt_with_a2a( - a2a_agents=a2a_agents, - task_description=original_description, - agent_cards=agent_cards, - failed_agents=failed_agents, - extension_registry=extension_registry, - ) - task.response_model = agent_response_model - - try: - raw_result = original_fn(self, task, context, tools) - agent_response = _parse_agent_response( - raw_result=raw_result, agent_response_model=agent_response_model - ) - - if extension_registry and isinstance(agent_response, BaseModel): - agent_response = extension_registry.process_response_with_all( - agent_response, extension_states - ) - - if isinstance(agent_response, BaseModel) and isinstance( - agent_response, AgentResponseProtocol - ): - if agent_response.is_a2a: - return _delegate_to_a2a( - self, - agent_response=agent_response, - task=task, - original_fn=original_fn, - context=context, - tools=tools, - agent_cards=agent_cards, - original_task_description=original_description, - _extension_registry=extension_registry, - ) - task.output_pydantic = None - return agent_response.message - - return raw_result - finally: - task.description = original_description - task.output_pydantic = original_output_pydantic - task.response_model = original_response_model - - -def _kickoff_with_a2a( - self: Agent, - a2a_agents: list[A2AConfig | A2AClientConfig], - original_kickoff: Callable[..., LiteAgentOutput], - messages: str | list[Any], - response_format: type[Any] | None, - input_files: dict[str, Any] | None, - agent_response_model: type[BaseModel] | None, - extension_registry: ExtensionRegistry, -) -> LiteAgentOutput: - """Execute kickoff with A2A delegation support (sync). - - Args: - self: The agent instance. - a2a_agents: List of A2A agent configurations. - original_kickoff: The original kickoff method. - messages: Messages to send to the agent. - response_format: Optional response format. - input_files: Optional input files. - agent_response_model: Optional agent response model. - extension_registry: Registry of A2A extensions. - - Returns: - LiteAgentOutput from kickoff or A2A delegation. - """ - if isinstance(messages, str): - description = messages - else: - content = next( - (m["content"] for m in reversed(messages) if m["role"] == "user"), - None, - ) - description = content if isinstance(content, str) else "" - - if not description: - return original_kickoff(self, messages, response_format, input_files) - - fake_task = Task( - description=description, - agent=self, - expected_output="Result from A2A delegation", - input_files=input_files or {}, - ) - - agent_cards, failed_agents = _fetch_agent_cards_concurrently(a2a_agents) - - if not agent_cards and a2a_agents and failed_agents: - return original_kickoff(self, messages, response_format, input_files) - - fake_task.description, _, extension_states = _augment_prompt_with_a2a( - a2a_agents=a2a_agents, - task_description=description, - agent_cards=agent_cards, - failed_agents=failed_agents, - extension_registry=extension_registry, - ) - fake_task.response_model = agent_response_model - - try: - result: LiteAgentOutput = original_kickoff( - self, messages, agent_response_model or response_format, input_files - ) - agent_response = _parse_agent_response( - raw_result=result.raw, agent_response_model=agent_response_model - ) - - if extension_registry and isinstance(agent_response, BaseModel): - agent_response = extension_registry.process_response_with_all( - agent_response, extension_states - ) - - if isinstance(agent_response, BaseModel) and isinstance( - agent_response, AgentResponseProtocol - ): - if agent_response.is_a2a: - - def _kickoff_adapter( - self_: Agent, - _task: Task, - _context: str | None, - _tools: list[Any] | None, - ) -> str: - fmt = ( - _task.response_model or agent_response_model or response_format - ) - output: LiteAgentOutput = original_kickoff( - self_, messages, fmt, input_files - ) - return output.raw - - result_str = _delegate_to_a2a( - self, - agent_response=agent_response, - task=fake_task, - original_fn=_kickoff_adapter, - context=None, - tools=None, - agent_cards=agent_cards, - original_task_description=description, - _extension_registry=extension_registry, - ) - return LiteAgentOutput( - raw=result_str, - pydantic=None, - agent_role=self.role, - usage_metrics=None, - messages=[], - ) - return LiteAgentOutput( - raw=agent_response.message, - pydantic=None, - agent_role=self.role, - usage_metrics=result.usage_metrics, - messages=result.messages, - ) - - return result - finally: - fake_task.description = description - - -async def _akickoff_with_a2a( - self: Agent, - a2a_agents: list[A2AConfig | A2AClientConfig], - original_kickoff_async: Callable[..., Coroutine[Any, Any, LiteAgentOutput]], - messages: str | list[Any], - response_format: type[Any] | None, - input_files: dict[str, Any] | None, - agent_response_model: type[BaseModel] | None, - extension_registry: ExtensionRegistry, -) -> LiteAgentOutput: - """Execute kickoff with A2A delegation support (async). - - Args: - self: The agent instance. - a2a_agents: List of A2A agent configurations. - original_kickoff_async: The original kickoff_async method. - messages: Messages to send to the agent. - response_format: Optional response format. - input_files: Optional input files. - agent_response_model: Optional agent response model. - extension_registry: Registry of A2A extensions. - - Returns: - LiteAgentOutput from kickoff or A2A delegation. - """ - if isinstance(messages, str): - description = messages - else: - content = next( - (m["content"] for m in reversed(messages) if m["role"] == "user"), - None, - ) - description = content if isinstance(content, str) else "" - - if not description: - return await original_kickoff_async( - self, messages, response_format, input_files - ) - - fake_task = Task( - description=description, - agent=self, - expected_output="Result from A2A delegation", - input_files=input_files or {}, - ) - - agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents) - - if not agent_cards and a2a_agents and failed_agents: - return await original_kickoff_async( - self, messages, response_format, input_files - ) - - fake_task.description, _, extension_states = _augment_prompt_with_a2a( - a2a_agents=a2a_agents, - task_description=description, - agent_cards=agent_cards, - failed_agents=failed_agents, - extension_registry=extension_registry, - ) - fake_task.response_model = agent_response_model - - try: - result: LiteAgentOutput = await original_kickoff_async( - self, messages, agent_response_model or response_format, input_files - ) - agent_response = _parse_agent_response( - raw_result=result.raw, agent_response_model=agent_response_model - ) - - if extension_registry and isinstance(agent_response, BaseModel): - agent_response = extension_registry.process_response_with_all( - agent_response, extension_states - ) - - if isinstance(agent_response, BaseModel) and isinstance( - agent_response, AgentResponseProtocol - ): - if agent_response.is_a2a: - - async def _kickoff_adapter( - self_: Agent, - _task: Task, - _context: str | None, - _tools: list[Any] | None, - ) -> str: - fmt = ( - _task.response_model or agent_response_model or response_format - ) - output: LiteAgentOutput = await original_kickoff_async( - self_, messages, fmt, input_files - ) - return output.raw - - result_str = await _adelegate_to_a2a( - self, - agent_response=agent_response, - task=fake_task, - original_fn=_kickoff_adapter, - context=None, - tools=None, - agent_cards=agent_cards, - original_task_description=description, - _extension_registry=extension_registry, - ) - return LiteAgentOutput( - raw=result_str, - pydantic=None, - agent_role=self.role, - usage_metrics=None, - messages=[], - ) - return LiteAgentOutput( - raw=agent_response.message, - pydantic=None, - agent_role=self.role, - usage_metrics=result.usage_metrics, - messages=result.messages, - ) - - return result - finally: - fake_task.description = description - - -def _augment_prompt_with_a2a( - a2a_agents: list[A2AConfig | A2AClientConfig], - task_description: str, - agent_cards: Mapping[str, AgentCard | dict[str, Any]], - conversation_history: list[Message] | None = None, - turn_num: int = 0, - max_turns: int | None = None, - failed_agents: dict[str, str] | None = None, - extension_registry: ExtensionRegistry | None = None, - remote_status_notice: str = "", -) -> tuple[str, bool, dict[type[A2AExtension], ConversationState]]: - """Add A2A delegation instructions to prompt. - - Args: - a2a_agents: Dictionary of A2A agent configurations - task_description: Original task description - agent_cards: dictionary mapping agent IDs to AgentCards - conversation_history: Previous A2A Messages from conversation - turn_num: Current turn number (0-indexed) - max_turns: Maximum allowed turns (from config) - failed_agents: Dictionary mapping failed agent endpoints to error messages - extension_registry: Optional registry of A2A extensions - remote_status_notice: Optional notice about remote agent status to append - - Returns: - Tuple of (augmented prompt, disable_structured_output flag, extension_states dict) - """ - - if not agent_cards: - return task_description, False, {} - - agents_text = "" - - for config in a2a_agents: - if config.endpoint in agent_cards: - card = agent_cards[config.endpoint] - if isinstance(card, dict): - filtered = { - k: v - for k, v in card.items() - if k in {"description", "url", "skills"} and v is not None - } - agents_text += f"\n{json.dumps(filtered, indent=2)}\n" - else: - agents_text += f"\n{card.model_dump_json(indent=2, exclude_none=True, include={'description', 'url', 'skills'})}\n" - - failed_agents = failed_agents or {} - if failed_agents: - agents_text += "\n\n" - for endpoint, error in failed_agents.items(): - agents_text += f"\n\n" - - agents_text = AVAILABLE_AGENTS_TEMPLATE.substitute(available_a2a_agents=agents_text) - - history_text = "" - - if conversation_history: - for msg in conversation_history: - history_text += f"\n{msg.model_dump_json(indent=2, exclude_none=True, exclude={'message_id'})}\n" - - history_text = PREVIOUS_A2A_CONVERSATION_TEMPLATE.substitute( - previous_a2a_conversation=history_text - ) - - extension_states = {} - disable_structured_output = False - if extension_registry and conversation_history: - extension_states = extension_registry.extract_all_states(conversation_history) - for state in extension_states.values(): - if state.is_ready(): - disable_structured_output = True - break - turn_info = "" - - if max_turns is not None and conversation_history: - turn_count = turn_num + 1 - warning = "" - if turn_count >= max_turns: - warning = ( - "CRITICAL: This is the FINAL turn. You MUST conclude the conversation now.\n" - "Set is_a2a=false and provide your final response to complete the task." - ) - elif turn_count == max_turns - 1: - warning = "WARNING: Next turn will be the last. Consider wrapping up the conversation." - - turn_info = CONVERSATION_TURN_INFO_TEMPLATE.substitute( - turn_count=turn_count, - max_turns=max_turns, - warning=warning, - ) - - augmented_prompt = f"""{task_description} - -IMPORTANT: You have the ability to delegate this task to remote A2A agents. -{agents_text} -{history_text}{turn_info}{remote_status_notice} - -""" - - if extension_registry: - augmented_prompt = extension_registry.augment_prompt_with_all( - augmented_prompt, extension_states - ) - - return augmented_prompt, disable_structured_output, extension_states - - -def _parse_agent_response( - raw_result: str | dict[str, Any], agent_response_model: type[BaseModel] | None -) -> BaseModel | str | dict[str, Any]: - """Parse LLM output as AgentResponse or return raw agent response.""" - if agent_response_model: - try: - if isinstance(raw_result, str): - return agent_response_model.model_validate_json(raw_result) - if isinstance(raw_result, dict): - return agent_response_model.model_validate(raw_result) - except ValidationError: - return raw_result - return raw_result - - -def _handle_max_turns_exceeded( - conversation_history: list[Message], - max_turns: int, - from_task: Any | None = None, - from_agent: Any | None = None, - endpoint: str | None = None, - a2a_agent_name: str | None = None, - agent_card: dict[str, Any] | None = None, -) -> str: - """Handle the case when max turns is exceeded. - - Shared logic for both sync and async delegation. - - Returns: - Final message if found in history. - - Raises: - Exception: If no final message found and max turns exceeded. - """ - if conversation_history: - for msg in reversed(conversation_history): - if msg.role == Role.agent: - text_parts = [ - part.root.text for part in msg.parts if part.root.kind == "text" - ] - final_message = ( - " ".join(text_parts) if text_parts else "Conversation completed" - ) - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="completed", - final_result=final_message, - error=None, - total_turns=max_turns, - from_task=from_task, - from_agent=from_agent, - endpoint=endpoint, - a2a_agent_name=a2a_agent_name, - agent_card=agent_card, - ), - ) - return final_message - - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="failed", - final_result=None, - error=f"Conversation exceeded maximum turns ({max_turns})", - total_turns=max_turns, - from_task=from_task, - from_agent=from_agent, - endpoint=endpoint, - a2a_agent_name=a2a_agent_name, - agent_card=agent_card, - ), - ) - raise Exception(f"A2A conversation exceeded maximum turns ({max_turns})") - - -def _emit_delegation_failed( - error_msg: str, - turn_num: int, - from_task: Any | None, - from_agent: Any | None, - endpoint: str | None, - a2a_agent_name: str | None, - agent_card: dict[str, Any] | None, -) -> str: - """Emit failure event and return formatted error message.""" - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="failed", - final_result=None, - error=error_msg, - total_turns=turn_num + 1, - from_task=from_task, - from_agent=from_agent, - endpoint=endpoint, - a2a_agent_name=a2a_agent_name, - agent_card=agent_card, - ), - ) - return f"A2A delegation failed: {error_msg}" - - -def _process_response_result( - raw_result: str, - disable_structured_output: bool, - turn_num: int, - agent_role: str, - agent_response_model: type[BaseModel] | None, - extension_registry: ExtensionRegistry | None = None, - extension_states: dict[type[A2AExtension], ConversationState] | None = None, - from_task: Any | None = None, - from_agent: Any | None = None, - endpoint: str | None = None, - a2a_agent_name: str | None = None, - agent_card: dict[str, Any] | None = None, -) -> tuple[str | None, str | None]: - """Process LLM response and determine next action. - - Shared logic for both sync and async handlers. - - Returns: - Tuple of (final_result, next_request). - """ - if disable_structured_output: - final_turn_number = turn_num + 1 - result_text = str(raw_result) - crewai_event_bus.emit( - None, - A2AMessageSentEvent( - message=result_text, - turn_number=final_turn_number, - is_multiturn=True, - agent_role=agent_role, - from_task=from_task, - from_agent=from_agent, - endpoint=endpoint, - a2a_agent_name=a2a_agent_name, - ), - ) - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="completed", - final_result=result_text, - error=None, - total_turns=final_turn_number, - from_task=from_task, - from_agent=from_agent, - endpoint=endpoint, - a2a_agent_name=a2a_agent_name, - agent_card=agent_card, - ), - ) - return result_text, None - - llm_response = _parse_agent_response( - raw_result=raw_result, agent_response_model=agent_response_model - ) - - if extension_registry and isinstance(llm_response, BaseModel): - llm_response = extension_registry.process_response_with_all( - llm_response, extension_states or {} - ) - - if isinstance(llm_response, BaseModel) and isinstance( - llm_response, AgentResponseProtocol - ): - if not llm_response.is_a2a: - final_turn_number = turn_num + 1 - crewai_event_bus.emit( - None, - A2AMessageSentEvent( - message=str(llm_response.message), - turn_number=final_turn_number, - is_multiturn=True, - agent_role=agent_role, - from_task=from_task, - from_agent=from_agent, - endpoint=endpoint, - a2a_agent_name=a2a_agent_name, - ), - ) - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="completed", - final_result=str(llm_response.message), - error=None, - total_turns=final_turn_number, - from_task=from_task, - from_agent=from_agent, - endpoint=endpoint, - a2a_agent_name=a2a_agent_name, - agent_card=agent_card, - ), - ) - return llm_response.message, None - return None, llm_response.message - - return str(raw_result), None - - -def _prepare_agent_cards_dict( - a2a_result: TaskStateResult, - agent_id: str, - agent_cards: Mapping[str, AgentCard | dict[str, Any]] | None, -) -> dict[str, AgentCard | dict[str, Any]]: - """Prepare agent cards dictionary from result and existing cards. - - Shared logic for both sync and async response handlers. - """ - agent_cards_dict: dict[str, AgentCard | dict[str, Any]] = ( - dict(agent_cards) if agent_cards else {} - ) - if "agent_card" in a2a_result and agent_id not in agent_cards_dict: - agent_cards_dict[agent_id] = a2a_result["agent_card"] - return agent_cards_dict - - -def _init_delegation_state( - ctx: DelegationContext, - agent_cards: dict[str, AgentCard] | None, -) -> DelegationState: - """Initialize delegation state from context and agent cards. - - Args: - ctx: Delegation context with config and settings. - agent_cards: Pre-fetched agent cards. - - Returns: - Initial delegation state for the conversation loop. - """ - current_agent_card = agent_cards.get(ctx.agent_id) if agent_cards else None - return DelegationState( - current_request=ctx.current_request, - context_id=ctx.context_id, - task_id=ctx.task_id, - reference_task_ids=list(ctx.reference_task_ids), - conversation_history=[], - agent_card=current_agent_card, - agent_card_dict=current_agent_card.model_dump() if current_agent_card else None, - agent_name=current_agent_card.name if current_agent_card else None, - ) - - -def _get_turn_context( - agent_config: A2AConfig | A2AClientConfig, -) -> tuple[Any | None, list[str] | None]: - """Get context for a delegation turn. - - Returns: - Tuple of (agent_branch, accepted_output_modes). - """ - console_formatter = getattr(crewai_event_bus, "_console", None) - agent_branch = None - if console_formatter: - agent_branch = getattr( - console_formatter, "current_agent_branch", None - ) or getattr(console_formatter, "current_task_branch", None) - - accepted_output_modes = None - if isinstance(agent_config, A2AClientConfig): - accepted_output_modes = agent_config.accepted_output_modes - - return agent_branch, accepted_output_modes - - -def _prepare_delegation_context( - self: Agent, - agent_response: AgentResponseProtocol, - task: Task, - original_task_description: str | None, -) -> DelegationContext: - """Prepare delegation context from agent response and task. - - Shared logic for both sync and async delegation. - - Returns: - DelegationContext with all values needed for delegation. - """ - a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) - agent_ids = tuple(config.endpoint for config in a2a_agents) - current_request = str(agent_response.message) - - if not a2a_agents: - raise ValueError("No A2A agents configured for delegation") - - if isinstance(agent_response, AgentResponseProtocol) and agent_response.a2a_ids: - agent_id = agent_response.a2a_ids[0] - else: - agent_id = agent_ids[0] - - if agent_id not in agent_ids: - raise ValueError(f"Unknown A2A agent ID: {agent_id} not in {agent_ids}") - - agent_config = next(filter(lambda x: x.endpoint == agent_id, a2a_agents), None) - if agent_config is None: - raise ValueError(f"Agent configuration not found for endpoint: {agent_id}") - task_config = task.config or {} - - if original_task_description is None: - original_task_description = task.description - - return DelegationContext( - a2a_agents=a2a_agents, - agent_response_model=agent_response_model, - current_request=current_request, - agent_id=agent_id, - agent_config=agent_config, - context_id=task_config.get("context_id"), - task_id=task_config.get("task_id"), - metadata=task_config.get("metadata"), - extensions=task_config.get("extensions"), - reference_task_ids=task_config.get("reference_task_ids", []), - original_task_description=original_task_description, - max_turns=agent_config.max_turns, - ) - - -def _handle_task_completion( - a2a_result: TaskStateResult, - task: Task, - task_id_config: str | None, - reference_task_ids: list[str], - agent_config: A2AConfig | A2AClientConfig, - turn_num: int, - from_task: Any | None = None, - from_agent: Any | None = None, - endpoint: str | None = None, - a2a_agent_name: str | None = None, - agent_card: dict[str, Any] | None = None, -) -> tuple[str | None, str | None, list[str], str]: - """Handle task completion state including reference task updates. - - When a remote task completes, this function: - 1. Adds the completed task_id to reference_task_ids (if not already present) - 2. Clears task_id_config to signal that a new task ID should be generated for next turn - 3. Updates task.config with the reference list for subsequent A2A calls - - The reference_task_ids list tracks all completed tasks in this conversation chain, - allowing the remote agent to maintain context across multi-turn interactions. - - Shared logic for both sync and async delegation. - - Args: - a2a_result: Result from A2A delegation containing task status. - task: CrewAI Task object to update with reference IDs. - task_id_config: Current task ID (will be added to references if task completed). - reference_task_ids: Mutable list of completed task IDs (updated in place). - agent_config: A2A configuration with trust settings. - turn_num: Current turn number. - from_task: Optional CrewAI Task for event metadata. - from_agent: Optional CrewAI Agent for event metadata. - endpoint: A2A endpoint URL. - a2a_agent_name: Name of remote A2A agent. - agent_card: Agent card dict for event metadata. - - Returns: - Tuple of (result_if_trusted, updated_task_id, updated_reference_task_ids, remote_notice). - - result_if_trusted: Final result if trust_remote_completion_status=True, else None - - updated_task_id: None (cleared to generate new ID for next turn) - - updated_reference_task_ids: The mutated list with completed task added - - remote_notice: Template notice about remote agent response - """ - remote_notice = "" - if a2a_result["status"] == TaskState.completed: - remote_notice = REMOTE_AGENT_RESPONSE_NOTICE - - if task_id_config is not None and task_id_config not in reference_task_ids: - reference_task_ids.append(task_id_config) - - if task.config is None: - task.config = {} - task.config["reference_task_ids"] = list(reference_task_ids) - - task_id_config = None - - if agent_config.trust_remote_completion_status: - result_text = a2a_result.get("result", "") - final_turn_number = turn_num + 1 - crewai_event_bus.emit( - None, - A2AConversationCompletedEvent( - status="completed", - final_result=result_text, - error=None, - total_turns=final_turn_number, - from_task=from_task, - from_agent=from_agent, - endpoint=endpoint, - a2a_agent_name=a2a_agent_name, - agent_card=agent_card, - ), - ) - return str(result_text), task_id_config, reference_task_ids, remote_notice - - return None, task_id_config, reference_task_ids, remote_notice - - -def _handle_agent_response_and_continue( - self: Agent, - a2a_result: TaskStateResult, - agent_id: str, - agent_cards: dict[str, AgentCard] | None, - a2a_agents: list[A2AConfig | A2AClientConfig], - original_task_description: str, - conversation_history: list[Message], - turn_num: int, - max_turns: int, - task: Task, - original_fn: Callable[..., str], - context: str | None, - tools: list[BaseTool] | None, - agent_response_model: type[BaseModel] | None, - extension_registry: ExtensionRegistry | None = None, - remote_status_notice: str = "", - endpoint: str | None = None, - a2a_agent_name: str | None = None, - agent_card: dict[str, Any] | None = None, -) -> tuple[str | None, str | None]: - """Handle A2A result and get CrewAI agent's response. - - Args: - self: The agent instance - a2a_result: Result from A2A delegation - agent_id: ID of the A2A agent - agent_cards: Pre-fetched agent cards - a2a_agents: List of A2A configurations - original_task_description: Original task description - conversation_history: Conversation history - turn_num: Current turn number - max_turns: Maximum turns allowed - task: The task being executed - original_fn: Original execute_task method - context: Optional context - tools: Optional tools - agent_response_model: Response model for parsing - - Returns: - Tuple of (final_result, current_request) where: - - final_result is not None if conversation should end - - current_request is the next message to send if continuing - """ - agent_cards_dict = _prepare_agent_cards_dict(a2a_result, agent_id, agent_cards) - - ( - task.description, - disable_structured_output, - extension_states, - ) = _augment_prompt_with_a2a( - a2a_agents=a2a_agents, - task_description=original_task_description, - conversation_history=conversation_history, - turn_num=turn_num, - max_turns=max_turns, - agent_cards=agent_cards_dict, - remote_status_notice=remote_status_notice, - ) - - original_response_model = task.response_model - if disable_structured_output: - task.response_model = None - - raw_result = original_fn(self, task, context, tools) - - if disable_structured_output: - task.response_model = original_response_model - - return _process_response_result( - raw_result=raw_result, - disable_structured_output=disable_structured_output, - turn_num=turn_num, - agent_role=self.role, - agent_response_model=agent_response_model, - extension_registry=extension_registry, - extension_states=extension_states, - from_task=task, - from_agent=self, - endpoint=endpoint, - a2a_agent_name=a2a_agent_name, - agent_card=agent_card, - ) - - -def _delegate_to_a2a( - self: Agent, - agent_response: AgentResponseProtocol, - task: Task, - original_fn: Callable[..., str], - context: str | None, - tools: list[BaseTool] | None, - agent_cards: dict[str, AgentCard] | None = None, - original_task_description: str | None = None, - _extension_registry: ExtensionRegistry | None = None, -) -> str: - """Delegate to A2A agent with multi-turn conversation support. - - Args: - self: The agent instance - agent_response: The AgentResponse indicating delegation - task: The task being executed (for extracting A2A fields) - original_fn: The original execute_task method for follow-ups - context: Optional context for task execution - tools: Optional tools available to the agent - agent_cards: Pre-fetched agent cards from _execute_task_with_a2a - original_task_description: The original task description before A2A augmentation - _extension_registry: Optional registry of A2A extensions (unused, reserved for future use) - - Returns: - Result from A2A agent - - Raises: - ImportError: If a2a-sdk is not installed - """ - ctx = _prepare_delegation_context( - self, agent_response, task, original_task_description - ) - state = _init_delegation_state(ctx, agent_cards) - current_request = state.current_request - context_id = state.context_id - task_id = state.task_id - reference_task_ids = state.reference_task_ids - conversation_history = state.conversation_history - - try: - for turn_num in range(ctx.max_turns): - agent_branch, accepted_output_modes = _get_turn_context(ctx.agent_config) - - merged_metadata = dict(ctx.metadata) if ctx.metadata else {} - if _extension_registry and conversation_history: - _ext_states = _extension_registry.extract_all_states( - conversation_history - ) - merged_metadata.update( - _extension_registry.prepare_all_metadata(_ext_states) - ) - - a2a_result = execute_a2a_delegation( - endpoint=ctx.agent_config.endpoint, - auth=ctx.agent_config.auth, - timeout=ctx.agent_config.timeout, - task_description=current_request, - context_id=context_id, - task_id=task_id, - reference_task_ids=reference_task_ids, - metadata=merged_metadata or None, - extensions=ctx.extensions, - conversation_history=conversation_history, - agent_id=ctx.agent_id, - agent_role=Role.user, - agent_branch=agent_branch, - response_model=ctx.agent_config.response_model, - turn_number=turn_num + 1, - updates=ctx.agent_config.updates, - transport=ctx.agent_config.transport, - from_task=task, - from_agent=self, - client_extensions=getattr(ctx.agent_config, "extensions", None), - accepted_output_modes=accepted_output_modes, - input_files=task.input_files, - ) - - conversation_history = a2a_result.get("history", []) - - if conversation_history: - latest_message = conversation_history[-1] - if latest_message.task_id is not None: - task_id = latest_message.task_id - if latest_message.context_id is not None: - context_id = latest_message.context_id - - if a2a_result["status"] in [TaskState.completed, TaskState.input_required]: - trusted_result, task_id, reference_task_ids, remote_notice = ( - _handle_task_completion( - a2a_result, - task, - task_id, - reference_task_ids, - ctx.agent_config, - turn_num, - from_task=task, - from_agent=self, - endpoint=ctx.agent_config.endpoint, - a2a_agent_name=state.agent_name, - agent_card=state.agent_card_dict, - ) - ) - if trusted_result is not None: - return trusted_result - - final_result, next_request = _handle_agent_response_and_continue( - self=self, - a2a_result=a2a_result, - agent_id=ctx.agent_id, - agent_cards=agent_cards, - a2a_agents=ctx.a2a_agents, - original_task_description=ctx.original_task_description, - conversation_history=conversation_history, - turn_num=turn_num, - max_turns=ctx.max_turns, - task=task, - original_fn=original_fn, - context=context, - tools=tools, - agent_response_model=ctx.agent_response_model, - extension_registry=_extension_registry, - remote_status_notice=remote_notice, - endpoint=ctx.agent_config.endpoint, - a2a_agent_name=state.agent_name, - agent_card=state.agent_card_dict, - ) - - if final_result is not None: - return final_result - - if next_request is not None: - current_request = next_request - - continue - - error_msg = a2a_result.get("error", "Unknown error") - - final_result, next_request = _handle_agent_response_and_continue( - self=self, - a2a_result=a2a_result, - agent_id=ctx.agent_id, - agent_cards=agent_cards, - a2a_agents=ctx.a2a_agents, - original_task_description=ctx.original_task_description, - conversation_history=conversation_history, - turn_num=turn_num, - max_turns=ctx.max_turns, - task=task, - original_fn=original_fn, - context=context, - tools=tools, - agent_response_model=ctx.agent_response_model, - extension_registry=_extension_registry, - endpoint=ctx.agent_config.endpoint, - a2a_agent_name=state.agent_name, - agent_card=state.agent_card_dict, - ) - - if final_result is not None: - return final_result - - if next_request is not None: - current_request = next_request - continue - - return _emit_delegation_failed( - error_msg, - turn_num, - task, - self, - ctx.agent_config.endpoint, - state.agent_name, - state.agent_card_dict, - ) - - return _handle_max_turns_exceeded( - conversation_history, - ctx.max_turns, - from_task=task, - from_agent=self, - endpoint=ctx.agent_config.endpoint, - a2a_agent_name=state.agent_name, - agent_card=state.agent_card_dict, - ) - - finally: - task.description = ctx.original_task_description - - async def _afetch_card_from_config( config: A2AConfig | A2AClientConfig, ) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]: - """Fetch agent card from A2A config asynchronously.""" + """Async variant of :func:`_fetch_card_from_config`.""" try: card = await afetch_agent_card( endpoint=config.endpoint, @@ -1436,15 +240,15 @@ async def _afetch_card_from_config( async def _afetch_agent_cards_concurrently( a2a_agents: list[A2AConfig | A2AClientConfig], ) -> tuple[dict[str, AgentCard], dict[str, str]]: - """Fetch agent cards concurrently for multiple A2A agents using asyncio.""" + """Async variant of :func:`_fetch_agent_cards_concurrently`.""" agent_cards: dict[str, AgentCard] = {} failed_agents: dict[str, str] = {} if not a2a_agents: return agent_cards, failed_agents - tasks = [_afetch_card_from_config(config) for config in a2a_agents] - results = await asyncio.gather(*tasks) + fetch_tasks = [_afetch_card_from_config(config) for config in a2a_agents] + results = await asyncio.gather(*fetch_tasks) for config, result in results: if isinstance(result, Exception): @@ -1460,313 +264,273 @@ async def _afetch_agent_cards_concurrently( return agent_cards, failed_agents +def _build_unavailable_notice(failed_agents: dict[str, str]) -> str: + text = "" + for endpoint, error in failed_agents.items(): + text += f" - {endpoint}: {error}\n" + return UNAVAILABLE_AGENTS_NOTICE_TEMPLATE.substitute(unavailable_agents=text) + + +def _augment_prompt_with_a2a( + a2a_agents: list[A2AConfig | A2AClientConfig], + task_description: str, + agent_cards: Mapping[str, AgentCard | dict[str, Any]], + failed_agents: dict[str, str] | None = None, +) -> str: + """Add A2A delegation context (the available agent cards) to a prompt. + + Tool-call mechanics are documented inside the template; this only renders + the cards themselves so the LLM can see each remote agent's capabilities. + """ + if not agent_cards: + return task_description + + agents_text = "" + for config in a2a_agents: + if config.endpoint in agent_cards: + card = agent_cards[config.endpoint] + if isinstance(card, dict): + filtered = { + k: v + for k, v in card.items() + if k in {"name", "description", "url", "skills"} and v is not None + } + agents_text += f"\n{json.dumps(filtered, indent=2)}\n" + else: + agents_text += ( + "\n" + + card.model_dump_json( + indent=2, + exclude_none=True, + include={"name", "description", "url", "skills"}, + ) + + "\n" + ) + + failed_agents = failed_agents or {} + if failed_agents: + agents_text += "\n\n" + for endpoint, error in failed_agents.items(): + agents_text += ( + f"\n\n" + ) + + available = AVAILABLE_AGENTS_TEMPLATE.substitute(available_a2a_agents=agents_text) + return f"{task_description}\n{available}\n" + + +def _execute_task_with_a2a( + self: Agent, + a2a_agents: list[A2AConfig | A2AClientConfig], + original_fn: Callable[..., str], + task: Task, + context: str | None, + tools: list[BaseTool] | None, + extension_registry: ExtensionRegistry, +) -> str: + """Wrap execute_task with A2A delegation logic (sync).""" + original_description: str = task.description + agent_cards, failed_agents = _fetch_agent_cards_concurrently(a2a_agents) + + if not agent_cards and a2a_agents and failed_agents: + task.description = ( + f"{original_description}{_build_unavailable_notice(failed_agents)}" + ) + try: + return original_fn(self, task, context, tools) + finally: + task.description = original_description + + state = A2ADelegationState( + agent=self, task=task, extension_registry=extension_registry + ) + a2a_tools = build_a2a_tools(a2a_agents, agent_cards, state) + + augmented = _augment_prompt_with_a2a( + a2a_agents=a2a_agents, + task_description=original_description, + agent_cards=agent_cards, + failed_agents=failed_agents, + ) + if extension_registry: + augmented = extension_registry.augment_prompt_with_all(augmented, {}) + + task.description = augmented + combined_tools: list[BaseTool] = [*(tools or []), *a2a_tools] + try: + return original_fn(self, task, context, combined_tools) + finally: + task.description = original_description + + async def _aexecute_task_with_a2a( self: Agent, a2a_agents: list[A2AConfig | A2AClientConfig], original_fn: Callable[..., Coroutine[Any, Any, str]], task: Task, - agent_response_model: type[BaseModel] | None, context: str | None, tools: list[BaseTool] | None, extension_registry: ExtensionRegistry, ) -> str: - """Async version of _execute_task_with_a2a.""" + """Async variant of :func:`_execute_task_with_a2a`.""" original_description: str = task.description - original_output_pydantic = task.output_pydantic - original_response_model = task.response_model - agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents) if not agent_cards and a2a_agents and failed_agents: - unavailable_agents_text = "" - for endpoint, error in failed_agents.items(): - unavailable_agents_text += f" - {endpoint}: {error}\n" - - notice = UNAVAILABLE_AGENTS_NOTICE_TEMPLATE.substitute( - unavailable_agents=unavailable_agents_text + task.description = ( + f"{original_description}{_build_unavailable_notice(failed_agents)}" ) - task.description = f"{original_description}{notice}" - try: return await original_fn(self, task, context, tools) finally: task.description = original_description - task.description, _, extension_states = _augment_prompt_with_a2a( + state = A2ADelegationState( + agent=self, task=task, extension_registry=extension_registry + ) + a2a_tools = build_a2a_tools(a2a_agents, agent_cards, state) + + augmented = _augment_prompt_with_a2a( a2a_agents=a2a_agents, task_description=original_description, agent_cards=agent_cards, failed_agents=failed_agents, - extension_registry=extension_registry, ) - task.response_model = agent_response_model + if extension_registry: + augmented = extension_registry.augment_prompt_with_all(augmented, {}) + task.description = augmented + combined_tools: list[BaseTool] = [*(tools or []), *a2a_tools] try: - raw_result = await original_fn(self, task, context, tools) - agent_response = _parse_agent_response( - raw_result=raw_result, agent_response_model=agent_response_model - ) - - if extension_registry and isinstance(agent_response, BaseModel): - agent_response = extension_registry.process_response_with_all( - agent_response, extension_states - ) - - if isinstance(agent_response, BaseModel) and isinstance( - agent_response, AgentResponseProtocol - ): - if agent_response.is_a2a: - return await _adelegate_to_a2a( - self, - agent_response=agent_response, - task=task, - original_fn=original_fn, - context=context, - tools=tools, - agent_cards=agent_cards, - original_task_description=original_description, - _extension_registry=extension_registry, - ) - task.output_pydantic = None - return agent_response.message - - return raw_result + return await original_fn(self, task, context, combined_tools) finally: task.description = original_description - task.output_pydantic = original_output_pydantic - task.response_model = original_response_model -async def _ahandle_agent_response_and_continue( - self: Agent, - a2a_result: TaskStateResult, - agent_id: str, - agent_cards: dict[str, AgentCard] | None, - a2a_agents: list[A2AConfig | A2AClientConfig], - original_task_description: str, - conversation_history: list[Message], - turn_num: int, - max_turns: int, - task: Task, - original_fn: Callable[..., Coroutine[Any, Any, str]], - context: str | None, - tools: list[BaseTool] | None, - agent_response_model: type[BaseModel] | None, - extension_registry: ExtensionRegistry | None = None, - remote_status_notice: str = "", - endpoint: str | None = None, - a2a_agent_name: str | None = None, - agent_card: dict[str, Any] | None = None, -) -> tuple[str | None, str | None]: - """Async version of _handle_agent_response_and_continue.""" - agent_cards_dict = _prepare_agent_cards_dict(a2a_result, agent_id, agent_cards) - - ( - task.description, - disable_structured_output, - extension_states, - ) = _augment_prompt_with_a2a( - a2a_agents=a2a_agents, - task_description=original_task_description, - conversation_history=conversation_history, - turn_num=turn_num, - max_turns=max_turns, - agent_cards=agent_cards_dict, - remote_status_notice=remote_status_notice, - ) - - original_response_model = task.response_model - if disable_structured_output: - task.response_model = None - - raw_result = await original_fn(self, task, context, tools) - - if disable_structured_output: - task.response_model = original_response_model - - return _process_response_result( - raw_result=raw_result, - disable_structured_output=disable_structured_output, - turn_num=turn_num, - agent_role=self.role, - agent_response_model=agent_response_model, - extension_registry=extension_registry, - extension_states=extension_states, - from_task=task, - from_agent=self, - endpoint=endpoint, - a2a_agent_name=a2a_agent_name, - agent_card=agent_card, - ) - - -async def _adelegate_to_a2a( - self: Agent, - agent_response: AgentResponseProtocol, - task: Task, - original_fn: Callable[..., Coroutine[Any, Any, str]], - context: str | None, - tools: list[BaseTool] | None, - agent_cards: dict[str, AgentCard] | None = None, - original_task_description: str | None = None, - _extension_registry: ExtensionRegistry | None = None, -) -> str: - """Async version of _delegate_to_a2a.""" - ctx = _prepare_delegation_context( - self, agent_response, task, original_task_description - ) - state = _init_delegation_state(ctx, agent_cards) - current_request = state.current_request - context_id = state.context_id - task_id = state.task_id - reference_task_ids = state.reference_task_ids - conversation_history = state.conversation_history - +@contextlib.contextmanager +def _temporarily_extend_tools(agent: Agent, extra: list[BaseTool]) -> Iterator[None]: + """Append ``extra`` to ``agent.tools`` for the lifetime of the context.""" + if not extra: + yield + return + original_tools = agent.tools + if original_tools is None: + agent.tools = list(extra) + else: + agent.tools = [*original_tools, *extra] try: - for turn_num in range(ctx.max_turns): - agent_branch, accepted_output_modes = _get_turn_context(ctx.agent_config) + yield + finally: + agent.tools = original_tools - merged_metadata = dict(ctx.metadata) if ctx.metadata else {} - if _extension_registry and conversation_history: - _ext_states = _extension_registry.extract_all_states( - conversation_history - ) - merged_metadata.update( - _extension_registry.prepare_all_metadata(_ext_states) - ) - a2a_result = await aexecute_a2a_delegation( - endpoint=ctx.agent_config.endpoint, - auth=ctx.agent_config.auth, - timeout=ctx.agent_config.timeout, - task_description=current_request, - context_id=context_id, - task_id=task_id, - reference_task_ids=reference_task_ids, - metadata=merged_metadata or None, - extensions=ctx.extensions, - conversation_history=conversation_history, - agent_id=ctx.agent_id, - agent_role=Role.user, - agent_branch=agent_branch, - response_model=ctx.agent_config.response_model, - turn_number=turn_num + 1, - transport=ctx.agent_config.transport, - updates=ctx.agent_config.updates, - from_task=task, - from_agent=self, - client_extensions=getattr(ctx.agent_config, "extensions", None), - accepted_output_modes=accepted_output_modes, - input_files=task.input_files, - ) +def _kickoff_description(messages: str | list[Any]) -> str: + if isinstance(messages, str): + return messages + content = next( + (m["content"] for m in reversed(messages) if m["role"] == "user"), + None, + ) + return content if isinstance(content, str) else "" - conversation_history = a2a_result.get("history", []) - if conversation_history: - latest_message = conversation_history[-1] - if latest_message.task_id is not None: - task_id = latest_message.task_id - if latest_message.context_id is not None: - context_id = latest_message.context_id +def _kickoff_with_a2a( + self: Agent, + a2a_agents: list[A2AConfig | A2AClientConfig], + original_kickoff: Callable[..., LiteAgentOutput], + messages: str | list[Any], + response_format: type[Any] | None, + input_files: dict[str, Any] | None, + extension_registry: ExtensionRegistry, +) -> LiteAgentOutput: + """Execute kickoff with A2A delegation support (sync).""" + description = _kickoff_description(messages) + if not description: + return original_kickoff(self, messages, response_format, input_files) - if a2a_result["status"] in [TaskState.completed, TaskState.input_required]: - trusted_result, task_id, reference_task_ids, remote_notice = ( - _handle_task_completion( - a2a_result, - task, - task_id, - reference_task_ids, - ctx.agent_config, - turn_num, - from_task=task, - from_agent=self, - endpoint=ctx.agent_config.endpoint, - a2a_agent_name=state.agent_name, - agent_card=state.agent_card_dict, - ) - ) - if trusted_result is not None: - return trusted_result + agent_cards, failed_agents = _fetch_agent_cards_concurrently(a2a_agents) + if not agent_cards and a2a_agents and failed_agents: + return original_kickoff(self, messages, response_format, input_files) - final_result, next_request = await _ahandle_agent_response_and_continue( - self=self, - a2a_result=a2a_result, - agent_id=ctx.agent_id, - agent_cards=agent_cards, - a2a_agents=ctx.a2a_agents, - original_task_description=ctx.original_task_description, - conversation_history=conversation_history, - turn_num=turn_num, - max_turns=ctx.max_turns, - task=task, - original_fn=original_fn, - context=context, - tools=tools, - agent_response_model=ctx.agent_response_model, - extension_registry=_extension_registry, - remote_status_notice=remote_notice, - endpoint=ctx.agent_config.endpoint, - a2a_agent_name=state.agent_name, - agent_card=state.agent_card_dict, - ) + fake_task = Task( + description=description, + agent=self, + expected_output="Result from A2A delegation", + input_files=input_files or {}, + ) + state = A2ADelegationState( + agent=self, task=fake_task, extension_registry=extension_registry + ) + a2a_tools = build_a2a_tools(a2a_agents, agent_cards, state) - if final_result is not None: - return final_result + augmented = _augment_prompt_with_a2a( + a2a_agents=a2a_agents, + task_description=description, + agent_cards=agent_cards, + failed_agents=failed_agents, + ) + if extension_registry: + augmented = extension_registry.augment_prompt_with_all(augmented, {}) - if next_request is not None: - current_request = next_request + if isinstance(messages, str): + wrapped_messages: str | list[Any] = augmented + else: + wrapped_messages = [*messages, {"role": "user", "content": augmented}] - continue + with _temporarily_extend_tools(self, a2a_tools): + return original_kickoff(self, wrapped_messages, response_format, input_files) - error_msg = a2a_result.get("error", "Unknown error") - final_result, next_request = await _ahandle_agent_response_and_continue( - self=self, - a2a_result=a2a_result, - agent_id=ctx.agent_id, - agent_cards=agent_cards, - a2a_agents=ctx.a2a_agents, - original_task_description=ctx.original_task_description, - conversation_history=conversation_history, - turn_num=turn_num, - max_turns=ctx.max_turns, - task=task, - original_fn=original_fn, - context=context, - tools=tools, - agent_response_model=ctx.agent_response_model, - extension_registry=_extension_registry, - endpoint=ctx.agent_config.endpoint, - a2a_agent_name=state.agent_name, - agent_card=state.agent_card_dict, - ) - - if final_result is not None: - return final_result - - if next_request is not None: - current_request = next_request - continue - - return _emit_delegation_failed( - error_msg, - turn_num, - task, - self, - ctx.agent_config.endpoint, - state.agent_name, - state.agent_card_dict, - ) - - return _handle_max_turns_exceeded( - conversation_history, - ctx.max_turns, - from_task=task, - from_agent=self, - endpoint=ctx.agent_config.endpoint, - a2a_agent_name=state.agent_name, - agent_card=state.agent_card_dict, +async def _akickoff_with_a2a( + self: Agent, + a2a_agents: list[A2AConfig | A2AClientConfig], + original_kickoff_async: Callable[..., Coroutine[Any, Any, LiteAgentOutput]], + messages: str | list[Any], + response_format: type[Any] | None, + input_files: dict[str, Any] | None, + extension_registry: ExtensionRegistry, +) -> LiteAgentOutput: + """Execute kickoff with A2A delegation support (async).""" + description = _kickoff_description(messages) + if not description: + return await original_kickoff_async( + self, messages, response_format, input_files ) - finally: - task.description = ctx.original_task_description + agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents) + if not agent_cards and a2a_agents and failed_agents: + return await original_kickoff_async( + self, messages, response_format, input_files + ) + + fake_task = Task( + description=description, + agent=self, + expected_output="Result from A2A delegation", + input_files=input_files or {}, + ) + state = A2ADelegationState( + agent=self, task=fake_task, extension_registry=extension_registry + ) + a2a_tools = build_a2a_tools(a2a_agents, agent_cards, state) + + augmented = _augment_prompt_with_a2a( + a2a_agents=a2a_agents, + task_description=description, + agent_cards=agent_cards, + failed_agents=failed_agents, + ) + if extension_registry: + augmented = extension_registry.augment_prompt_with_all(augmented, {}) + + if isinstance(messages, str): + wrapped_messages: str | list[Any] = augmented + else: + wrapped_messages = [*messages, {"role": "user", "content": augmented}] + + with _temporarily_extend_tools(self, a2a_tools): + return await original_kickoff_async( + self, wrapped_messages, response_format, input_files + ) diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 561307680..8def29f27 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -112,12 +112,6 @@ from crewai.utilities.token_counter_callback import TokenCalcHandler from crewai.utilities.training_handler import CrewTrainingHandler -try: - from crewai.a2a.types import AgentResponseProtocol -except ImportError: - AgentResponseProtocol = None # type: ignore[assignment, misc] - - if TYPE_CHECKING: from crewai_files import FileInput @@ -626,15 +620,7 @@ class Agent(BaseAgent): result = process_tool_results(self, result) - output_for_event = result - if ( - AgentResponseProtocol is not None - and isinstance(result, BaseModel) - and isinstance(result, AgentResponseProtocol) - ): - output_for_event = str(result.message) - elif not isinstance(result, str): - output_for_event = str(result) + output_for_event = result if isinstance(result, str) else str(result) crewai_event_bus.emit( self, diff --git a/lib/crewai/src/crewai/lite_agent.py b/lib/crewai/src/crewai/lite_agent.py index cd9823e15..e16530cbf 100644 --- a/lib/crewai/src/crewai/lite_agent.py +++ b/lib/crewai/src/crewai/lite_agent.py @@ -121,11 +121,11 @@ def _kickoff_with_a2a_support( Returns: LiteAgentOutput from either local execution or A2A delegation. """ - from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model + from crewai.a2a.utils.response_model import extract_a2a_client_configs from crewai.a2a.wrapper import _execute_task_with_a2a from crewai.task import Task - a2a_agents, agent_response_model = get_a2a_agents_and_response_model(agent.a2a) + a2a_agents = extract_a2a_client_configs(agent.a2a) if not a2a_agents: return original_kickoff(messages, response_format, input_files) @@ -160,7 +160,6 @@ def _kickoff_with_a2a_support( a2a_agents=a2a_agents, original_fn=task_to_kickoff_adapter, task=fake_task, - agent_response_model=agent_response_model, context=None, tools=None, extension_registry=extension_registry, diff --git a/lib/crewai/tests/agents/test_a2a_trust_completion_status.py b/lib/crewai/tests/agents/test_a2a_trust_completion_status.py index 6347f8e1c..f33a2351f 100644 --- a/lib/crewai/tests/agents/test_a2a_trust_completion_status.py +++ b/lib/crewai/tests/agents/test_a2a_trust_completion_status.py @@ -1,13 +1,14 @@ -"""Test trust_remote_completion_status flag in A2A wrapper.""" +"""Tests for A2A delegation tool behavior, including trust_remote_completion_status.""" from unittest.mock import MagicMock, patch import pytest -from crewai.a2a.config import A2AConfig +from crewai.a2a.config import A2AClientConfig, A2AConfig + try: - from a2a.types import Message, Role + from a2a.types import TaskState # noqa: F401 A2A_SDK_INSTALLED = True except ImportError: @@ -15,141 +16,126 @@ except ImportError: def _create_mock_agent_card(name: str = "Test", url: str = "http://test-endpoint.com/"): - """Create a mock agent card with proper model_dump behavior.""" + """Create a mock agent card with the attributes A2ADelegationTool reads.""" mock_card = MagicMock() mock_card.name = name mock_card.url = url + mock_card.description = "A test agent" + mock_card.skills = [] mock_card.model_dump.return_value = {"name": name, "url": url} - mock_card.model_dump_json.return_value = f'{{"name": "{name}", "url": "{url}"}}' return mock_card @pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") -def test_trust_remote_completion_status_true_returns_directly(): - """When trust_remote_completion_status=True and A2A returns completed, return result directly.""" - from crewai.a2a.wrapper import _delegate_to_a2a - from crewai.a2a.types import AgentResponseProtocol +def test_delegation_tool_returns_remote_result_on_completion(): + """A successful remote completion is returned to the local LLM as the tool result.""" + from a2a.types import TaskState + from crewai import Agent, Task + from crewai.a2a.tools import A2ADelegationState, build_a2a_tools - a2a_config = A2AConfig( - endpoint="http://test-endpoint.com", - trust_remote_completion_status=True, - ) - - agent = Agent( - role="test manager", - goal="coordinate", - backstory="test", - a2a=a2a_config, - ) - + config = A2AClientConfig(endpoint="http://test-endpoint.com") + agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config) task = Task(description="test", expected_output="test", agent=agent) - class MockResponse: - is_a2a = True - message = "Please help" - a2a_ids = ["http://test-endpoint.com/"] + card = _create_mock_agent_card() + state = A2ADelegationState(agent=agent, task=task) + tools = build_a2a_tools([config], {config.endpoint: card}, state) + assert len(tools) == 1 + tool = tools[0] - with ( - patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute, - patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch, - ): - mock_card = _create_mock_agent_card() - mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {}) - - # A2A returns completed + with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute: mock_execute.return_value = { - "status": "completed", + "status": TaskState.completed, "result": "Done by remote", "history": [], } + result = tool._run(message="Please help") - # This should return directly without checking LLM response - result = _delegate_to_a2a( - self=agent, - agent_response=MockResponse(), - task=task, - original_fn=lambda *args, **kwargs: "fallback", - context=None, - tools=None, - agent_cards={"http://test-endpoint.com/": mock_card}, - original_task_description="test", - ) - - assert result == "Done by remote" - assert mock_execute.call_count == 1 + assert result == "Done by remote" + assert mock_execute.call_count == 1 @pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") -def test_trust_remote_completion_status_false_continues_conversation(): - """When trust_remote_completion_status=False and A2A returns completed, ask server agent.""" - from crewai.a2a.wrapper import _delegate_to_a2a +def test_delegation_tool_records_completed_task_in_references(): + """When a remote task completes with a task_id, it goes into reference_task_ids.""" + from a2a.types import TaskState + from crewai import Agent, Task + from crewai.a2a.tools import A2ADelegationState, build_a2a_tools - a2a_config = A2AConfig( - endpoint="http://test-endpoint.com", - trust_remote_completion_status=False, - ) - - agent = Agent( - role="test manager", - goal="coordinate", - backstory="test", - a2a=a2a_config, - ) - + config = A2AClientConfig(endpoint="http://test-endpoint.com") + agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config) task = Task(description="test", expected_output="test", agent=agent) - class MockResponse: - is_a2a = True - message = "Please help" - a2a_ids = ["http://test-endpoint.com/"] + card = _create_mock_agent_card() + state = A2ADelegationState(agent=agent, task=task) + [tool] = build_a2a_tools([config], {config.endpoint: card}, state) - call_count = 0 + history_msg = MagicMock() + history_msg.task_id = "remote-task-1" + history_msg.context_id = "ctx-1" - def mock_original_fn(self, task, context, tools): - nonlocal call_count - call_count += 1 - if call_count == 1: - # Server decides to finish - return '{"is_a2a": false, "message": "Server final answer", "a2a_ids": []}' - return "unexpected" - - with ( - patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute, - patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch, - ): - mock_card = _create_mock_agent_card() - mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {}) - - # A2A returns completed + with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute: mock_execute.return_value = { - "status": "completed", - "result": "Done by remote", - "history": [], + "status": TaskState.completed, + "result": "Done", + "history": [history_msg], } + tool._run(message="Please help") - result = _delegate_to_a2a( - self=agent, - agent_response=MockResponse(), - task=task, - original_fn=mock_original_fn, - context=None, - tools=None, - agent_cards={"http://test-endpoint.com/": mock_card}, - original_task_description="test", - ) - - # Should call original_fn to get server response - assert call_count >= 1 - assert result == "Server final answer" + endpoint_state = state._per_endpoint[config.endpoint] + assert "remote-task-1" in endpoint_state.reference_task_ids + assert endpoint_state.task_id is None + assert task.config is not None + assert task.config["reference_task_ids"] == ["remote-task-1"] @pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +def test_delegation_tool_returns_error_message_on_failure(): + """A non-completed/non-input-required status surfaces as a readable error string.""" + from a2a.types import TaskState + + from crewai import Agent, Task + from crewai.a2a.tools import A2ADelegationState, build_a2a_tools + + config = A2AClientConfig(endpoint="http://test-endpoint.com") + agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config) + task = Task(description="test", expected_output="test", agent=agent) + + card = _create_mock_agent_card() + state = A2ADelegationState(agent=agent, task=task) + [tool] = build_a2a_tools([config], {config.endpoint: card}, state) + + with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute: + mock_execute.return_value = { + "status": TaskState.failed, + "error": "remote agent unreachable", + "history": [], + } + result = tool._run(message="Please help") + + assert "remote agent unreachable" in result + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +def test_delegation_tool_respects_max_turns_via_usage_count(): + """A2AConfig.max_turns wires through to BaseTool.max_usage_count.""" + from crewai import Agent, Task + from crewai.a2a.tools import A2ADelegationState, build_a2a_tools + + config = A2AClientConfig(endpoint="http://test-endpoint.com", max_turns=2) + agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config) + task = Task(description="test", expected_output="test", agent=agent) + + card = _create_mock_agent_card() + state = A2ADelegationState(agent=agent, task=task) + [tool] = build_a2a_tools([config], {config.endpoint: card}, state) + + assert tool.max_usage_count == 2 + + def test_default_trust_remote_completion_status_is_false(): """Verify that default value of trust_remote_completion_status is False.""" - a2a_config = A2AConfig( - endpoint="http://test-endpoint.com", - ) - + a2a_config = A2AConfig(endpoint="http://test-endpoint.com") assert a2a_config.trust_remote_completion_status is False \ No newline at end of file