mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-15 05:58:10 +00:00
Compare commits
4 Commits
docs/custo
...
gl/refacto
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f32fe819b4 | ||
|
|
189d769cb9 | ||
|
|
6fe34644ab | ||
|
|
27463ce8c4 |
@@ -1,23 +1,20 @@
|
||||
"""String templates for A2A (Agent-to-Agent) protocol messaging and status."""
|
||||
"""String templates for A2A (Agent-to-Agent) delegation prompts."""
|
||||
|
||||
from string import Template
|
||||
from typing import Final
|
||||
|
||||
|
||||
AVAILABLE_AGENTS_TEMPLATE: Final[Template] = Template(
|
||||
"\n<AVAILABLE_A2A_AGENTS>\n $available_a2a_agents\n</AVAILABLE_A2A_AGENTS>\n"
|
||||
)
|
||||
PREVIOUS_A2A_CONVERSATION_TEMPLATE: Final[Template] = Template(
|
||||
"\n<PREVIOUS_A2A_CONVERSATION>\n"
|
||||
" $previous_a2a_conversation"
|
||||
"\n</PREVIOUS_A2A_CONVERSATION>\n"
|
||||
)
|
||||
CONVERSATION_TURN_INFO_TEMPLATE: Final[Template] = Template(
|
||||
"\n<CONVERSATION_PROGRESS>\n"
|
||||
' turn="$turn_count"\n'
|
||||
' max_turns="$max_turns"\n'
|
||||
" $warning"
|
||||
"\n</CONVERSATION_PROGRESS>\n"
|
||||
"\n<AVAILABLE_A2A_AGENTS>\n"
|
||||
"You can delegate to remote agents using the delegate_to_* tools below. "
|
||||
"Each tool's description lists the remote agent's capabilities — call the "
|
||||
"tool whose capabilities best match the task. Pass the question or sub-task "
|
||||
"to the remote agent via the tool's `message` argument; the tool returns "
|
||||
"the remote agent's response, which you should incorporate into your final "
|
||||
"answer. If the available agents are not a good fit, answer directly "
|
||||
"without calling a delegation tool.\n\n"
|
||||
" $available_a2a_agents"
|
||||
"\n</AVAILABLE_A2A_AGENTS>\n"
|
||||
)
|
||||
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
|
||||
"\n<A2A_AGENTS_STATUS>\n"
|
||||
@@ -27,29 +24,3 @@ UNAVAILABLE_AGENTS_NOTICE_TEMPLATE: Final[Template] = Template(
|
||||
" $unavailable_agents"
|
||||
"\n</A2A_AGENTS_STATUS>\n"
|
||||
)
|
||||
REMOTE_AGENT_COMPLETED_NOTICE: Final[str] = """
|
||||
<REMOTE_AGENT_STATUS>
|
||||
STATUS: COMPLETED
|
||||
The remote agent has finished processing your request. Their response is in the conversation history above.
|
||||
You MUST now:
|
||||
1. Extract the answer from the conversation history
|
||||
2. Set is_a2a=false
|
||||
3. Return the answer as your final message
|
||||
DO NOT send another request - the task is already done.
|
||||
</REMOTE_AGENT_STATUS>
|
||||
"""
|
||||
|
||||
REMOTE_AGENT_RESPONSE_NOTICE: Final[str] = """
|
||||
<REMOTE_AGENT_STATUS>
|
||||
STATUS: RESPONSE_RECEIVED
|
||||
The remote agent has responded. Their response is in the conversation history above.
|
||||
|
||||
You MUST now:
|
||||
1. Set is_a2a=false (the remote task is complete and cannot receive more messages)
|
||||
2. Provide YOUR OWN response to the original task based on the information received
|
||||
|
||||
IMPORTANT: Your response should be addressed to the USER who gave you the original task.
|
||||
Report what the remote agent told you in THIRD PERSON (e.g., "The remote agent said..." or "I learned that...").
|
||||
Do NOT address the remote agent directly or use "you" to refer to them.
|
||||
</REMOTE_AGENT_STATUS>
|
||||
"""
|
||||
|
||||
394
lib/crewai/src/crewai/a2a/tools.py
Normal file
394
lib/crewai/src/crewai/a2a/tools.py
Normal file
@@ -0,0 +1,394 @@
|
||||
"""Tool-based A2A delegation.
|
||||
|
||||
Each remote A2A agent is exposed to the local LLM as a BaseTool. The local
|
||||
agent's normal tool-call loop drives multi-turn delegation: each tool call is
|
||||
one turn against the remote agent. Per-endpoint conversation state lives in
|
||||
``A2ADelegationState`` and is shared across the tools built for a single task.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from a2a.types import Role, TaskState
|
||||
from pydantic import BaseModel, Field, PrivateAttr
|
||||
|
||||
from crewai.a2a.config import A2AClientConfig, A2AConfig
|
||||
from crewai.a2a.extensions.base import (
|
||||
A2AExtension,
|
||||
ConversationState,
|
||||
ExtensionRegistry,
|
||||
)
|
||||
from crewai.a2a.task_helpers import TaskStateResult
|
||||
from crewai.a2a.utils.delegation import aexecute_a2a_delegation, execute_a2a_delegation
|
||||
from crewai.events.event_bus import crewai_event_bus
|
||||
from crewai.events.types.a2a_events import A2AConversationCompletedEvent
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities.string_utils import sanitize_tool_name
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from a2a.types import AgentCard, Message
|
||||
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
_DELEGATE_PREFIX = "delegate_to_"
|
||||
|
||||
|
||||
@dataclass
|
||||
class _EndpointState:
|
||||
"""Mutable per-endpoint conversation state across tool calls."""
|
||||
|
||||
conversation_history: list[Message] = field(default_factory=list)
|
||||
context_id: str | None = None
|
||||
task_id: str | None = None
|
||||
reference_task_ids: list[str] = field(default_factory=list)
|
||||
turn_count: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class A2ADelegationState:
|
||||
"""State shared across all A2A delegation tools for a single task execution."""
|
||||
|
||||
agent: Any
|
||||
task: Task
|
||||
extension_registry: ExtensionRegistry | None = None
|
||||
_per_endpoint: dict[str, _EndpointState] = field(default_factory=dict)
|
||||
|
||||
def _state_for(self, endpoint: str) -> _EndpointState:
|
||||
return self._per_endpoint.setdefault(endpoint, _EndpointState())
|
||||
|
||||
def _initial_ids_from_task(self, state: _EndpointState) -> None:
|
||||
if state.turn_count > 0:
|
||||
return
|
||||
task_config = self.task.config or {}
|
||||
if state.context_id is None:
|
||||
state.context_id = task_config.get("context_id")
|
||||
if state.task_id is None:
|
||||
state.task_id = task_config.get("task_id")
|
||||
if not state.reference_task_ids:
|
||||
state.reference_task_ids = list(task_config.get("reference_task_ids", []))
|
||||
|
||||
def delegate(
|
||||
self,
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
agent_card: AgentCard | None,
|
||||
message: str,
|
||||
) -> str:
|
||||
"""Run one delegation turn against ``config.endpoint``.
|
||||
|
||||
Returns the remote agent's response text, suitable for handing back to
|
||||
the local LLM as a tool result.
|
||||
"""
|
||||
return _run_delegation(self, config, agent_card, message, sync=True)
|
||||
|
||||
async def adelegate(
|
||||
self,
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
agent_card: AgentCard | None,
|
||||
message: str,
|
||||
) -> str:
|
||||
"""Async variant of :meth:`delegate`."""
|
||||
return await _run_delegation_async(self, config, agent_card, message)
|
||||
|
||||
|
||||
class _A2ADelegationArgs(BaseModel):
|
||||
"""Argument schema for A2A delegation tools."""
|
||||
|
||||
message: str = Field(
|
||||
...,
|
||||
description=(
|
||||
"The question or task to send to the remote agent. Be specific and "
|
||||
"self-contained: the remote agent does not see your other tools or "
|
||||
"your prior reasoning."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class A2ADelegationTool(BaseTool):
|
||||
"""BaseTool that delegates one turn of conversation to a remote A2A agent.
|
||||
|
||||
Each instance is bound to a specific A2A endpoint via ``_config``. Calling
|
||||
``_run`` or ``_arun`` advances that endpoint's conversation by one turn and
|
||||
returns the remote agent's response text.
|
||||
"""
|
||||
|
||||
args_schema: type[BaseModel] = _A2ADelegationArgs
|
||||
|
||||
_config: A2AConfig | A2AClientConfig = PrivateAttr()
|
||||
_agent_card: AgentCard | None = PrivateAttr(default=None)
|
||||
_state: A2ADelegationState = PrivateAttr()
|
||||
|
||||
def _run(self, message: str) -> str:
|
||||
return self._state.delegate(self._config, self._agent_card, message)
|
||||
|
||||
async def _arun(self, message: str) -> str:
|
||||
return await self._state.adelegate(self._config, self._agent_card, message)
|
||||
|
||||
|
||||
def build_a2a_tools(
|
||||
a2a_agents: list[A2AConfig | A2AClientConfig],
|
||||
agent_cards: dict[str, AgentCard],
|
||||
state: A2ADelegationState,
|
||||
) -> list[BaseTool]:
|
||||
"""Build one ``A2ADelegationTool`` per available A2A agent.
|
||||
|
||||
Tool names collide-disambiguate with a numeric suffix; agents whose cards
|
||||
failed to fetch are skipped.
|
||||
"""
|
||||
tools: list[BaseTool] = []
|
||||
used_names: set[str] = set()
|
||||
for config in a2a_agents:
|
||||
card = agent_cards.get(config.endpoint)
|
||||
if card is None:
|
||||
continue
|
||||
name = _build_tool_name(card.name or "remote_agent", used_names)
|
||||
used_names.add(name)
|
||||
tool = A2ADelegationTool(
|
||||
name=name,
|
||||
description=_build_tool_description(card),
|
||||
max_usage_count=config.max_turns,
|
||||
)
|
||||
tool._config = config
|
||||
tool._agent_card = card
|
||||
tool._state = state
|
||||
tools.append(tool)
|
||||
return tools
|
||||
|
||||
|
||||
def _build_tool_name(card_name: str, used: set[str]) -> str:
|
||||
base = sanitize_tool_name(f"{_DELEGATE_PREFIX}{card_name}")
|
||||
if base not in used:
|
||||
return base
|
||||
for i in range(2, 1000):
|
||||
candidate = sanitize_tool_name(f"{base}_{i}")
|
||||
if candidate not in used:
|
||||
return candidate
|
||||
raise ValueError(f"Could not generate unique tool name for {card_name!r}")
|
||||
|
||||
|
||||
def _build_tool_description(card: AgentCard) -> str:
|
||||
lines: list[str] = [f"Delegate a task to the remote A2A agent {card.name!r}."]
|
||||
if card.description:
|
||||
lines.append(card.description.strip())
|
||||
if card.skills:
|
||||
skill_names = ", ".join(s.name for s in card.skills if s.name)
|
||||
if skill_names:
|
||||
lines.append(f"Capabilities: {skill_names}.")
|
||||
lines.append(
|
||||
"Use this tool only when the question matches the agent's capabilities. "
|
||||
"After receiving a response, prefer answering directly unless you need "
|
||||
"another round-trip."
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _run_delegation(
|
||||
state: A2ADelegationState,
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
agent_card: AgentCard | None,
|
||||
message: str,
|
||||
*,
|
||||
sync: bool,
|
||||
) -> str:
|
||||
endpoint_state = state._state_for(config.endpoint)
|
||||
state._initial_ids_from_task(endpoint_state)
|
||||
|
||||
extension_states = _extract_extension_states(state, endpoint_state)
|
||||
metadata = _merged_metadata(state, endpoint_state, extension_states)
|
||||
agent_branch, accepted_output_modes = _turn_context(config)
|
||||
|
||||
a2a_result = execute_a2a_delegation(
|
||||
endpoint=config.endpoint,
|
||||
auth=config.auth,
|
||||
timeout=config.timeout,
|
||||
task_description=message,
|
||||
context_id=endpoint_state.context_id,
|
||||
task_id=endpoint_state.task_id,
|
||||
reference_task_ids=endpoint_state.reference_task_ids,
|
||||
metadata=metadata or None,
|
||||
extensions=(state.task.config or {}).get("extensions"),
|
||||
conversation_history=endpoint_state.conversation_history,
|
||||
agent_id=config.endpoint,
|
||||
agent_role=Role.user,
|
||||
agent_branch=agent_branch,
|
||||
response_model=config.response_model,
|
||||
turn_number=endpoint_state.turn_count + 1,
|
||||
updates=config.updates,
|
||||
transport=config.transport,
|
||||
from_task=state.task,
|
||||
from_agent=state.agent,
|
||||
client_extensions=getattr(config, "extensions", None),
|
||||
accepted_output_modes=accepted_output_modes,
|
||||
input_files=state.task.input_files,
|
||||
)
|
||||
return _finalize_turn(
|
||||
state, endpoint_state, config, agent_card, a2a_result, extension_states
|
||||
)
|
||||
|
||||
|
||||
async def _run_delegation_async(
|
||||
state: A2ADelegationState,
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
agent_card: AgentCard | None,
|
||||
message: str,
|
||||
) -> str:
|
||||
endpoint_state = state._state_for(config.endpoint)
|
||||
state._initial_ids_from_task(endpoint_state)
|
||||
|
||||
extension_states = _extract_extension_states(state, endpoint_state)
|
||||
metadata = _merged_metadata(state, endpoint_state, extension_states)
|
||||
agent_branch, accepted_output_modes = _turn_context(config)
|
||||
|
||||
a2a_result = await aexecute_a2a_delegation(
|
||||
endpoint=config.endpoint,
|
||||
auth=config.auth,
|
||||
timeout=config.timeout,
|
||||
task_description=message,
|
||||
context_id=endpoint_state.context_id,
|
||||
task_id=endpoint_state.task_id,
|
||||
reference_task_ids=endpoint_state.reference_task_ids,
|
||||
metadata=metadata or None,
|
||||
extensions=(state.task.config or {}).get("extensions"),
|
||||
conversation_history=endpoint_state.conversation_history,
|
||||
agent_id=config.endpoint,
|
||||
agent_role=Role.user,
|
||||
agent_branch=agent_branch,
|
||||
response_model=config.response_model,
|
||||
turn_number=endpoint_state.turn_count + 1,
|
||||
updates=config.updates,
|
||||
transport=config.transport,
|
||||
from_task=state.task,
|
||||
from_agent=state.agent,
|
||||
client_extensions=getattr(config, "extensions", None),
|
||||
accepted_output_modes=accepted_output_modes,
|
||||
input_files=state.task.input_files,
|
||||
)
|
||||
return _finalize_turn(
|
||||
state, endpoint_state, config, agent_card, a2a_result, extension_states
|
||||
)
|
||||
|
||||
|
||||
def _extract_extension_states(
|
||||
state: A2ADelegationState,
|
||||
endpoint_state: _EndpointState,
|
||||
) -> dict[type[A2AExtension], ConversationState]:
|
||||
if state.extension_registry and endpoint_state.conversation_history:
|
||||
return state.extension_registry.extract_all_states(
|
||||
endpoint_state.conversation_history
|
||||
)
|
||||
return {}
|
||||
|
||||
|
||||
def _merged_metadata(
|
||||
state: A2ADelegationState,
|
||||
endpoint_state: _EndpointState,
|
||||
extension_states: dict[type[A2AExtension], ConversationState],
|
||||
) -> dict[str, Any]:
|
||||
task_config = state.task.config or {}
|
||||
metadata: dict[str, Any] = dict(task_config.get("metadata") or {})
|
||||
if state.extension_registry and extension_states:
|
||||
metadata.update(state.extension_registry.prepare_all_metadata(extension_states))
|
||||
return metadata
|
||||
|
||||
|
||||
def _turn_context(
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
) -> tuple[Any | None, list[str] | None]:
|
||||
console_formatter = getattr(crewai_event_bus, "_console", None)
|
||||
agent_branch = None
|
||||
if console_formatter:
|
||||
agent_branch = getattr(
|
||||
console_formatter, "current_agent_branch", None
|
||||
) or getattr(console_formatter, "current_task_branch", None)
|
||||
|
||||
accepted_output_modes = None
|
||||
if isinstance(config, A2AClientConfig):
|
||||
accepted_output_modes = config.accepted_output_modes
|
||||
return agent_branch, accepted_output_modes
|
||||
|
||||
|
||||
def _finalize_turn(
|
||||
state: A2ADelegationState,
|
||||
endpoint_state: _EndpointState,
|
||||
config: A2AConfig | A2AClientConfig,
|
||||
agent_card: AgentCard | None,
|
||||
a2a_result: TaskStateResult,
|
||||
extension_states: dict[type[A2AExtension], ConversationState],
|
||||
) -> str:
|
||||
endpoint_state.conversation_history = list(a2a_result.get("history", []))
|
||||
if endpoint_state.conversation_history:
|
||||
latest = endpoint_state.conversation_history[-1]
|
||||
if latest.task_id is not None:
|
||||
endpoint_state.task_id = latest.task_id
|
||||
if latest.context_id is not None:
|
||||
endpoint_state.context_id = latest.context_id
|
||||
|
||||
endpoint_state.turn_count += 1
|
||||
status = a2a_result.get("status")
|
||||
|
||||
if status == TaskState.completed:
|
||||
if (
|
||||
endpoint_state.task_id is not None
|
||||
and endpoint_state.task_id not in endpoint_state.reference_task_ids
|
||||
):
|
||||
endpoint_state.reference_task_ids.append(endpoint_state.task_id)
|
||||
if state.task.config is None:
|
||||
state.task.config = {}
|
||||
state.task.config["reference_task_ids"] = list(
|
||||
endpoint_state.reference_task_ids
|
||||
)
|
||||
endpoint_state.task_id = None
|
||||
|
||||
result_text = str(a2a_result.get("result", ""))
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AConversationCompletedEvent(
|
||||
status="completed",
|
||||
final_result=result_text,
|
||||
error=None,
|
||||
total_turns=endpoint_state.turn_count,
|
||||
from_task=state.task,
|
||||
from_agent=state.agent,
|
||||
endpoint=config.endpoint,
|
||||
a2a_agent_name=agent_card.name if agent_card else None,
|
||||
agent_card=agent_card.model_dump() if agent_card else None,
|
||||
),
|
||||
)
|
||||
return _apply_response_extensions(state, result_text, extension_states)
|
||||
|
||||
if status == TaskState.input_required:
|
||||
result_text = str(a2a_result.get("result", ""))
|
||||
return _apply_response_extensions(state, result_text, extension_states)
|
||||
|
||||
error_msg = a2a_result.get("error", "Unknown error")
|
||||
crewai_event_bus.emit(
|
||||
None,
|
||||
A2AConversationCompletedEvent(
|
||||
status="failed",
|
||||
final_result=None,
|
||||
error=error_msg,
|
||||
total_turns=endpoint_state.turn_count,
|
||||
from_task=state.task,
|
||||
from_agent=state.agent,
|
||||
endpoint=config.endpoint,
|
||||
a2a_agent_name=agent_card.name if agent_card else None,
|
||||
agent_card=agent_card.model_dump() if agent_card else None,
|
||||
),
|
||||
)
|
||||
return f"Remote agent error: {error_msg}"
|
||||
|
||||
|
||||
def _apply_response_extensions(
|
||||
state: A2ADelegationState,
|
||||
response_text: str,
|
||||
extension_states: dict[type[A2AExtension], ConversationState],
|
||||
) -> str:
|
||||
if not state.extension_registry:
|
||||
return response_text
|
||||
processed = state.extension_registry.process_response_with_all(
|
||||
response_text, extension_states
|
||||
)
|
||||
return processed if isinstance(processed, str) else str(processed)
|
||||
@@ -6,8 +6,6 @@ from typing import (
|
||||
Annotated,
|
||||
Any,
|
||||
Literal,
|
||||
Protocol,
|
||||
runtime_checkable,
|
||||
)
|
||||
|
||||
from pydantic import BeforeValidator, HttpUrl, TypeAdapter
|
||||
@@ -57,15 +55,6 @@ Url = Annotated[
|
||||
]
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class AgentResponseProtocol(Protocol):
|
||||
"""Protocol for the dynamically created AgentResponse model."""
|
||||
|
||||
a2a_ids: tuple[str, ...]
|
||||
message: str
|
||||
is_a2a: bool
|
||||
|
||||
|
||||
class PartsMetadataDict(TypedDict, total=False):
|
||||
"""Metadata for A2A message parts.
|
||||
|
||||
|
||||
@@ -1,75 +1,25 @@
|
||||
"""Response model utilities for A2A agent interactions."""
|
||||
"""Helpers for extracting A2A client configurations."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TypeAlias
|
||||
|
||||
from pydantic import BaseModel, Field, create_model
|
||||
|
||||
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
|
||||
from crewai.types.utils import create_literals_from_strings
|
||||
|
||||
|
||||
A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig
|
||||
A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig
|
||||
|
||||
|
||||
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel] | None:
|
||||
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
|
||||
|
||||
Args:
|
||||
agent_ids: List of available A2A agent IDs.
|
||||
|
||||
Returns:
|
||||
Dynamically created Pydantic model with Literal-constrained a2a_ids field,
|
||||
or None if agent_ids is empty.
|
||||
"""
|
||||
if not agent_ids:
|
||||
return None
|
||||
|
||||
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
|
||||
|
||||
return create_model(
|
||||
"AgentResponse",
|
||||
a2a_ids=(
|
||||
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
|
||||
Field(
|
||||
default_factory=tuple,
|
||||
max_length=len(agent_ids),
|
||||
description="A2A agent IDs to delegate to.",
|
||||
),
|
||||
),
|
||||
message=(
|
||||
str,
|
||||
Field(
|
||||
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
|
||||
),
|
||||
),
|
||||
is_a2a=(
|
||||
bool,
|
||||
Field(
|
||||
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
|
||||
),
|
||||
),
|
||||
__base__=BaseModel,
|
||||
)
|
||||
|
||||
|
||||
def extract_a2a_agent_ids_from_config(
|
||||
def extract_a2a_client_configs(
|
||||
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
|
||||
) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]:
|
||||
"""Extract A2A agent IDs from A2A configuration.
|
||||
) -> list[A2AClientConfigTypes]:
|
||||
"""Return the client-side A2A configs from a possibly-mixed config list.
|
||||
|
||||
Filters out A2AServerConfig since it doesn't have an endpoint for delegation.
|
||||
|
||||
Args:
|
||||
a2a_config: A2A configuration (any type).
|
||||
|
||||
Returns:
|
||||
Tuple of client A2A configs list and agent endpoint IDs.
|
||||
Filters out :class:`A2AServerConfig`, which has no endpoint to delegate to.
|
||||
"""
|
||||
if a2a_config is None:
|
||||
return [], ()
|
||||
return []
|
||||
|
||||
configs: list[A2AConfigTypes]
|
||||
if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)):
|
||||
@@ -77,24 +27,6 @@ def extract_a2a_agent_ids_from_config(
|
||||
else:
|
||||
configs = a2a_config
|
||||
|
||||
client_configs: list[A2AClientConfigTypes] = [
|
||||
return [
|
||||
config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig))
|
||||
]
|
||||
|
||||
return client_configs, tuple(config.endpoint for config in client_configs)
|
||||
|
||||
|
||||
def get_a2a_agents_and_response_model(
|
||||
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
|
||||
) -> tuple[list[A2AClientConfigTypes], type[BaseModel] | None]:
|
||||
"""Get A2A agent configs and response model.
|
||||
|
||||
Args:
|
||||
a2a_config: A2A configuration (any type).
|
||||
|
||||
Returns:
|
||||
Tuple of client A2A configs and response model.
|
||||
"""
|
||||
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
|
||||
|
||||
return a2a_agents, create_agent_response_model(agent_ids)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -111,12 +111,6 @@ from crewai.utilities.token_counter_callback import TokenCalcHandler
|
||||
from crewai.utilities.training_handler import CrewTrainingHandler
|
||||
|
||||
|
||||
try:
|
||||
from crewai.a2a.types import AgentResponseProtocol
|
||||
except ImportError:
|
||||
AgentResponseProtocol = None # type: ignore[assignment, misc]
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai_files import FileInput
|
||||
|
||||
@@ -632,15 +626,7 @@ class Agent(BaseAgent):
|
||||
|
||||
result = process_tool_results(self, result)
|
||||
|
||||
output_for_event = result
|
||||
if (
|
||||
AgentResponseProtocol is not None
|
||||
and isinstance(result, BaseModel)
|
||||
and isinstance(result, AgentResponseProtocol)
|
||||
):
|
||||
output_for_event = str(result.message)
|
||||
elif not isinstance(result, str):
|
||||
output_for_event = str(result)
|
||||
output_for_event = result if isinstance(result, str) else str(result)
|
||||
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
|
||||
@@ -121,11 +121,11 @@ def _kickoff_with_a2a_support(
|
||||
Returns:
|
||||
LiteAgentOutput from either local execution or A2A delegation.
|
||||
"""
|
||||
from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model
|
||||
from crewai.a2a.utils.response_model import extract_a2a_client_configs
|
||||
from crewai.a2a.wrapper import _execute_task_with_a2a
|
||||
from crewai.task import Task
|
||||
|
||||
a2a_agents, agent_response_model = get_a2a_agents_and_response_model(agent.a2a)
|
||||
a2a_agents = extract_a2a_client_configs(agent.a2a)
|
||||
|
||||
if not a2a_agents:
|
||||
return original_kickoff(messages, response_format, input_files)
|
||||
@@ -160,7 +160,6 @@ def _kickoff_with_a2a_support(
|
||||
a2a_agents=a2a_agents,
|
||||
original_fn=task_to_kickoff_adapter,
|
||||
task=fake_task,
|
||||
agent_response_model=agent_response_model,
|
||||
context=None,
|
||||
tools=None,
|
||||
extension_registry=extension_registry,
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
"""Test trust_remote_completion_status flag in A2A wrapper."""
|
||||
"""Tests for A2A delegation tool behavior, including trust_remote_completion_status."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.a2a.config import A2AConfig
|
||||
from crewai.a2a.config import A2AClientConfig, A2AConfig
|
||||
|
||||
|
||||
try:
|
||||
from a2a.types import Message, Role
|
||||
from a2a.types import TaskState # noqa: F401
|
||||
|
||||
A2A_SDK_INSTALLED = True
|
||||
except ImportError:
|
||||
@@ -15,141 +16,126 @@ except ImportError:
|
||||
|
||||
|
||||
def _create_mock_agent_card(name: str = "Test", url: str = "http://test-endpoint.com/"):
|
||||
"""Create a mock agent card with proper model_dump behavior."""
|
||||
"""Create a mock agent card with the attributes A2ADelegationTool reads."""
|
||||
mock_card = MagicMock()
|
||||
mock_card.name = name
|
||||
mock_card.url = url
|
||||
mock_card.description = "A test agent"
|
||||
mock_card.skills = []
|
||||
mock_card.model_dump.return_value = {"name": name, "url": url}
|
||||
mock_card.model_dump_json.return_value = f'{{"name": "{name}", "url": "{url}"}}'
|
||||
return mock_card
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_trust_remote_completion_status_true_returns_directly():
|
||||
"""When trust_remote_completion_status=True and A2A returns completed, return result directly."""
|
||||
from crewai.a2a.wrapper import _delegate_to_a2a
|
||||
from crewai.a2a.types import AgentResponseProtocol
|
||||
def test_delegation_tool_returns_remote_result_on_completion():
|
||||
"""A successful remote completion is returned to the local LLM as the tool result."""
|
||||
from a2a.types import TaskState
|
||||
|
||||
from crewai import Agent, Task
|
||||
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
|
||||
|
||||
a2a_config = A2AConfig(
|
||||
endpoint="http://test-endpoint.com",
|
||||
trust_remote_completion_status=True,
|
||||
)
|
||||
|
||||
agent = Agent(
|
||||
role="test manager",
|
||||
goal="coordinate",
|
||||
backstory="test",
|
||||
a2a=a2a_config,
|
||||
)
|
||||
|
||||
config = A2AClientConfig(endpoint="http://test-endpoint.com")
|
||||
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
class MockResponse:
|
||||
is_a2a = True
|
||||
message = "Please help"
|
||||
a2a_ids = ["http://test-endpoint.com/"]
|
||||
card = _create_mock_agent_card()
|
||||
state = A2ADelegationState(agent=agent, task=task)
|
||||
tools = build_a2a_tools([config], {config.endpoint: card}, state)
|
||||
assert len(tools) == 1
|
||||
tool = tools[0]
|
||||
|
||||
with (
|
||||
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
|
||||
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
|
||||
):
|
||||
mock_card = _create_mock_agent_card()
|
||||
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
|
||||
|
||||
# A2A returns completed
|
||||
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
|
||||
mock_execute.return_value = {
|
||||
"status": "completed",
|
||||
"status": TaskState.completed,
|
||||
"result": "Done by remote",
|
||||
"history": [],
|
||||
}
|
||||
result = tool._run(message="Please help")
|
||||
|
||||
# This should return directly without checking LLM response
|
||||
result = _delegate_to_a2a(
|
||||
self=agent,
|
||||
agent_response=MockResponse(),
|
||||
task=task,
|
||||
original_fn=lambda *args, **kwargs: "fallback",
|
||||
context=None,
|
||||
tools=None,
|
||||
agent_cards={"http://test-endpoint.com/": mock_card},
|
||||
original_task_description="test",
|
||||
)
|
||||
|
||||
assert result == "Done by remote"
|
||||
assert mock_execute.call_count == 1
|
||||
assert result == "Done by remote"
|
||||
assert mock_execute.call_count == 1
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_trust_remote_completion_status_false_continues_conversation():
|
||||
"""When trust_remote_completion_status=False and A2A returns completed, ask server agent."""
|
||||
from crewai.a2a.wrapper import _delegate_to_a2a
|
||||
def test_delegation_tool_records_completed_task_in_references():
|
||||
"""When a remote task completes with a task_id, it goes into reference_task_ids."""
|
||||
from a2a.types import TaskState
|
||||
|
||||
from crewai import Agent, Task
|
||||
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
|
||||
|
||||
a2a_config = A2AConfig(
|
||||
endpoint="http://test-endpoint.com",
|
||||
trust_remote_completion_status=False,
|
||||
)
|
||||
|
||||
agent = Agent(
|
||||
role="test manager",
|
||||
goal="coordinate",
|
||||
backstory="test",
|
||||
a2a=a2a_config,
|
||||
)
|
||||
|
||||
config = A2AClientConfig(endpoint="http://test-endpoint.com")
|
||||
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
class MockResponse:
|
||||
is_a2a = True
|
||||
message = "Please help"
|
||||
a2a_ids = ["http://test-endpoint.com/"]
|
||||
card = _create_mock_agent_card()
|
||||
state = A2ADelegationState(agent=agent, task=task)
|
||||
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
|
||||
|
||||
call_count = 0
|
||||
history_msg = MagicMock()
|
||||
history_msg.task_id = "remote-task-1"
|
||||
history_msg.context_id = "ctx-1"
|
||||
|
||||
def mock_original_fn(self, task, context, tools):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
# Server decides to finish
|
||||
return '{"is_a2a": false, "message": "Server final answer", "a2a_ids": []}'
|
||||
return "unexpected"
|
||||
|
||||
with (
|
||||
patch("crewai.a2a.wrapper.execute_a2a_delegation") as mock_execute,
|
||||
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
|
||||
):
|
||||
mock_card = _create_mock_agent_card()
|
||||
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})
|
||||
|
||||
# A2A returns completed
|
||||
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
|
||||
mock_execute.return_value = {
|
||||
"status": "completed",
|
||||
"result": "Done by remote",
|
||||
"history": [],
|
||||
"status": TaskState.completed,
|
||||
"result": "Done",
|
||||
"history": [history_msg],
|
||||
}
|
||||
tool._run(message="Please help")
|
||||
|
||||
result = _delegate_to_a2a(
|
||||
self=agent,
|
||||
agent_response=MockResponse(),
|
||||
task=task,
|
||||
original_fn=mock_original_fn,
|
||||
context=None,
|
||||
tools=None,
|
||||
agent_cards={"http://test-endpoint.com/": mock_card},
|
||||
original_task_description="test",
|
||||
)
|
||||
|
||||
# Should call original_fn to get server response
|
||||
assert call_count >= 1
|
||||
assert result == "Server final answer"
|
||||
endpoint_state = state._per_endpoint[config.endpoint]
|
||||
assert "remote-task-1" in endpoint_state.reference_task_ids
|
||||
assert endpoint_state.task_id is None
|
||||
assert task.config is not None
|
||||
assert task.config["reference_task_ids"] == ["remote-task-1"]
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_delegation_tool_returns_error_message_on_failure():
|
||||
"""A non-completed/non-input-required status surfaces as a readable error string."""
|
||||
from a2a.types import TaskState
|
||||
|
||||
from crewai import Agent, Task
|
||||
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
|
||||
|
||||
config = A2AClientConfig(endpoint="http://test-endpoint.com")
|
||||
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
card = _create_mock_agent_card()
|
||||
state = A2ADelegationState(agent=agent, task=task)
|
||||
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
|
||||
|
||||
with patch("crewai.a2a.tools.execute_a2a_delegation") as mock_execute:
|
||||
mock_execute.return_value = {
|
||||
"status": TaskState.failed,
|
||||
"error": "remote agent unreachable",
|
||||
"history": [],
|
||||
}
|
||||
result = tool._run(message="Please help")
|
||||
|
||||
assert "remote agent unreachable" in result
|
||||
|
||||
|
||||
@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
|
||||
def test_delegation_tool_respects_max_turns_via_usage_count():
|
||||
"""A2AConfig.max_turns wires through to BaseTool.max_usage_count."""
|
||||
from crewai import Agent, Task
|
||||
from crewai.a2a.tools import A2ADelegationState, build_a2a_tools
|
||||
|
||||
config = A2AClientConfig(endpoint="http://test-endpoint.com", max_turns=2)
|
||||
agent = Agent(role="manager", goal="coordinate", backstory="test", a2a=config)
|
||||
task = Task(description="test", expected_output="test", agent=agent)
|
||||
|
||||
card = _create_mock_agent_card()
|
||||
state = A2ADelegationState(agent=agent, task=task)
|
||||
[tool] = build_a2a_tools([config], {config.endpoint: card}, state)
|
||||
|
||||
assert tool.max_usage_count == 2
|
||||
|
||||
|
||||
def test_default_trust_remote_completion_status_is_false():
|
||||
"""Verify that default value of trust_remote_completion_status is False."""
|
||||
a2a_config = A2AConfig(
|
||||
endpoint="http://test-endpoint.com",
|
||||
)
|
||||
|
||||
a2a_config = A2AConfig(endpoint="http://test-endpoint.com")
|
||||
assert a2a_config.trust_remote_completion_status is False
|
||||
Reference in New Issue
Block a user