diff --git a/lib/crewai/src/crewai/a2a/utils/__init__.py b/lib/crewai/src/crewai/a2a/utils/__init__.py new file mode 100644 index 000000000..bdb7bed62 --- /dev/null +++ b/lib/crewai/src/crewai/a2a/utils/__init__.py @@ -0,0 +1 @@ +"""A2A utility modules for client operations.""" diff --git a/lib/crewai/src/crewai/a2a/utils/agent_card.py b/lib/crewai/src/crewai/a2a/utils/agent_card.py new file mode 100644 index 000000000..20e621473 --- /dev/null +++ b/lib/crewai/src/crewai/a2a/utils/agent_card.py @@ -0,0 +1,360 @@ +"""AgentCard utilities for A2A client and server operations.""" + +from __future__ import annotations + +import asyncio +from collections.abc import MutableMapping +from functools import lru_cache +import time +from types import MethodType +from typing import TYPE_CHECKING + +from a2a.client.errors import A2AClientHTTPError +from a2a.types import AgentCapabilities, AgentCard, AgentSkill +from aiocache import cached # type: ignore[import-untyped] +from aiocache.serializers import PickleSerializer # type: ignore[import-untyped] +import httpx + +from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth +from crewai.a2a.auth.utils import ( + _auth_store, + configure_auth_client, + retry_on_401, +) +from crewai.crew import Crew + + +if TYPE_CHECKING: + from crewai.a2a.auth.schemas import AuthScheme + from crewai.agent import Agent + from crewai.task import Task + + +def fetch_agent_card( + endpoint: str, + auth: AuthScheme | None = None, + timeout: int = 30, + use_cache: bool = True, + cache_ttl: int = 300, +) -> AgentCard: + """Fetch AgentCard from an A2A endpoint with optional caching. + + Args: + endpoint: A2A agent endpoint URL (AgentCard URL). + auth: Optional AuthScheme for authentication. + timeout: Request timeout in seconds. + use_cache: Whether to use caching (default True). + cache_ttl: Cache TTL in seconds (default 300 = 5 minutes). + + Returns: + AgentCard object with agent capabilities and skills. + + Raises: + httpx.HTTPStatusError: If the request fails. + A2AClientHTTPError: If authentication fails. + """ + if use_cache: + if auth: + auth_data = auth.model_dump_json( + exclude={ + "_access_token", + "_token_expires_at", + "_refresh_token", + "_authorization_callback", + } + ) + auth_hash = hash((type(auth).__name__, auth_data)) + else: + auth_hash = 0 + _auth_store[auth_hash] = auth + ttl_hash = int(time.time() // cache_ttl) + return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete( + afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) + ) + finally: + loop.close() + + +async def afetch_agent_card( + endpoint: str, + auth: AuthScheme | None = None, + timeout: int = 30, + use_cache: bool = True, +) -> AgentCard: + """Fetch AgentCard from an A2A endpoint asynchronously. + + Native async implementation. Use this when running in an async context. + + Args: + endpoint: A2A agent endpoint URL (AgentCard URL). + auth: Optional AuthScheme for authentication. + timeout: Request timeout in seconds. + use_cache: Whether to use caching (default True). + + Returns: + AgentCard object with agent capabilities and skills. + + Raises: + httpx.HTTPStatusError: If the request fails. + A2AClientHTTPError: If authentication fails. + """ + if use_cache: + if auth: + auth_data = auth.model_dump_json( + exclude={ + "_access_token", + "_token_expires_at", + "_refresh_token", + "_authorization_callback", + } + ) + auth_hash = hash((type(auth).__name__, auth_data)) + else: + auth_hash = 0 + _auth_store[auth_hash] = auth + agent_card: AgentCard = await _afetch_agent_card_cached( + endpoint, auth_hash, timeout + ) + return agent_card + + return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) + + +@lru_cache() +def _fetch_agent_card_cached( + endpoint: str, + auth_hash: int, + timeout: int, + _ttl_hash: int, +) -> AgentCard: + """Cached sync version of fetch_agent_card.""" + auth = _auth_store.get(auth_hash) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete( + _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) + ) + finally: + loop.close() + + +@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator] +async def _afetch_agent_card_cached( + endpoint: str, + auth_hash: int, + timeout: int, +) -> AgentCard: + """Cached async implementation of AgentCard fetching.""" + auth = _auth_store.get(auth_hash) + return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) + + +async def _afetch_agent_card_impl( + endpoint: str, + auth: AuthScheme | None, + timeout: int, +) -> AgentCard: + """Internal async implementation of AgentCard fetching.""" + if "/.well-known/agent-card.json" in endpoint: + base_url = endpoint.replace("/.well-known/agent-card.json", "") + agent_card_path = "/.well-known/agent-card.json" + else: + url_parts = endpoint.split("/", 3) + base_url = f"{url_parts[0]}//{url_parts[2]}" + agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/" + + headers: MutableMapping[str, str] = {} + if auth: + async with httpx.AsyncClient(timeout=timeout) as temp_auth_client: + if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): + configure_auth_client(auth, temp_auth_client) + headers = await auth.apply_auth(temp_auth_client, {}) + + async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client: + if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): + configure_auth_client(auth, temp_client) + + agent_card_url = f"{base_url}{agent_card_path}" + + async def _fetch_agent_card_request() -> httpx.Response: + return await temp_client.get(agent_card_url) + + try: + response = await retry_on_401( + request_func=_fetch_agent_card_request, + auth_scheme=auth, + client=temp_client, + headers=temp_client.headers, + max_retries=2, + ) + response.raise_for_status() + + return AgentCard.model_validate(response.json()) + + except httpx.HTTPStatusError as e: + if e.response.status_code == 401: + error_details = ["Authentication failed"] + www_auth = e.response.headers.get("WWW-Authenticate") + if www_auth: + error_details.append(f"WWW-Authenticate: {www_auth}") + if not auth: + error_details.append("No auth scheme provided") + msg = " | ".join(error_details) + raise A2AClientHTTPError(401, msg) from e + raise + + +def _task_to_skill(task: Task) -> AgentSkill: + """Convert a CrewAI Task to an A2A AgentSkill. + + Args: + task: The CrewAI Task to convert. + + Returns: + AgentSkill representing the task's capability. + """ + task_name = task.name or task.description[:50] + task_id = task_name.lower().replace(" ", "_") + + tags: list[str] = [] + if task.agent: + tags.append(task.agent.role.lower().replace(" ", "-")) + + return AgentSkill( + id=task_id, + name=task_name, + description=task.description, + tags=tags, + examples=[task.expected_output] if task.expected_output else None, + ) + + +def _tool_to_skill(tool_name: str, tool_description: str) -> AgentSkill: + """Convert an Agent's tool to an A2A AgentSkill. + + Args: + tool_name: Name of the tool. + tool_description: Description of what the tool does. + + Returns: + AgentSkill representing the tool's capability. + """ + tool_id = tool_name.lower().replace(" ", "_") + + return AgentSkill( + id=tool_id, + name=tool_name, + description=tool_description, + tags=[tool_name.lower().replace(" ", "-")], + ) + + +def _crew_to_agent_card(crew: Crew, url: str) -> AgentCard: + """Generate an A2A AgentCard from a Crew instance. + + Args: + crew: The Crew instance to generate a card for. + url: The base URL where this crew will be exposed. + + Returns: + AgentCard describing the crew's capabilities. + """ + crew_name = getattr(crew, "name", None) or crew.__class__.__name__ + + description_parts: list[str] = [] + crew_description = getattr(crew, "description", None) + if crew_description: + description_parts.append(crew_description) + else: + agent_roles = [agent.role for agent in crew.agents] + description_parts.append( + f"A crew of {len(crew.agents)} agents: {', '.join(agent_roles)}" + ) + + skills = [_task_to_skill(task) for task in crew.tasks] + + return AgentCard( + name=crew_name, + description=" ".join(description_parts), + url=url, + version="1.0.0", + capabilities=AgentCapabilities( + streaming=True, + push_notifications=True, + ), + default_input_modes=["text/plain", "application/json"], + default_output_modes=["text/plain", "application/json"], + skills=skills, + ) + + +def _agent_to_agent_card(agent: Agent, url: str) -> AgentCard: + """Generate an A2A AgentCard from an Agent instance. + + Args: + agent: The Agent instance to generate a card for. + url: The base URL where this agent will be exposed. + + Returns: + AgentCard describing the agent's capabilities. + """ + description_parts = [agent.goal] + if agent.backstory: + description_parts.append(agent.backstory) + + skills: list[AgentSkill] = [] + + if agent.tools: + for tool in agent.tools: + tool_name = getattr(tool, "name", None) or tool.__class__.__name__ + tool_desc = getattr(tool, "description", None) or f"Tool: {tool_name}" + skills.append(_tool_to_skill(tool_name, tool_desc)) + + if not skills: + skills.append( + AgentSkill( + id=agent.role.lower().replace(" ", "_"), + name=agent.role, + description=agent.goal, + tags=[agent.role.lower().replace(" ", "-")], + ) + ) + + return AgentCard( + name=agent.role, + description=" ".join(description_parts), + url=url, + version="1.0.0", + capabilities=AgentCapabilities( + streaming=True, + push_notifications=True, + ), + default_input_modes=["text/plain", "application/json"], + default_output_modes=["text/plain", "application/json"], + skills=skills, + ) + + +def inject_a2a_server_methods(target: Crew | Agent) -> None: + """Inject A2A server methods onto a Crew or Agent instance. + + Adds a `to_agent_card(url: str) -> AgentCard` method to the target + instance that generates an A2A-compliant AgentCard. + + Args: + target: The Crew or Agent instance to inject methods onto. + """ + + def _to_agent_card(self: Crew | Agent, url: str) -> AgentCard: + if isinstance(self, Crew): + return _crew_to_agent_card(self, url) + return _agent_to_agent_card(self, url) + + target.to_agent_card = MethodType(_to_agent_card, target) # type: ignore[union-attr] diff --git a/lib/crewai/src/crewai/a2a/utils.py b/lib/crewai/src/crewai/a2a/utils/delegation.py similarity index 61% rename from lib/crewai/src/crewai/a2a/utils.py rename to lib/crewai/src/crewai/a2a/utils/delegation.py index 4b3ba23e9..ac65a6088 100644 --- a/lib/crewai/src/crewai/a2a/utils.py +++ b/lib/crewai/src/crewai/a2a/utils/delegation.py @@ -1,16 +1,14 @@ -"""Utility functions for A2A (Agent-to-Agent) protocol delegation.""" +"""A2A delegation utilities for executing tasks on remote agents.""" from __future__ import annotations import asyncio from collections.abc import AsyncIterator, MutableMapping from contextlib import asynccontextmanager -from functools import lru_cache -import time from typing import TYPE_CHECKING, Any import uuid -from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory +from a2a.client import Client, ClientConfig, ClientFactory from a2a.types import ( AgentCard, Message, @@ -20,19 +18,15 @@ from a2a.types import ( TextPart, TransportProtocol, ) -from aiocache import cached # type: ignore[import-untyped] -from aiocache.serializers import PickleSerializer # type: ignore[import-untyped] import httpx -from pydantic import BaseModel, Field, create_model +from pydantic import BaseModel from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth from crewai.a2a.auth.utils import ( _auth_store, configure_auth_client, - retry_on_401, validate_auth_against_agent_card, ) -from crewai.a2a.config import A2AConfig from crewai.a2a.task_helpers import TaskStateResult from crewai.a2a.types import ( HANDLER_REGISTRY, @@ -46,6 +40,7 @@ from crewai.a2a.updates import ( StreamingHandler, UpdateConfig, ) +from crewai.a2a.utils.agent_card import _afetch_agent_card_cached from crewai.events.event_bus import crewai_event_bus from crewai.events.types.a2a_events import ( A2AConversationStartedEvent, @@ -53,7 +48,6 @@ from crewai.events.types.a2a_events import ( A2ADelegationStartedEvent, A2AMessageSentEvent, ) -from crewai.types.utils import create_literals_from_strings if TYPE_CHECKING: @@ -76,187 +70,6 @@ def get_handler(config: UpdateConfig | None) -> HandlerType: return HANDLER_REGISTRY.get(type(config), StreamingHandler) -@lru_cache() -def _fetch_agent_card_cached( - endpoint: str, - auth_hash: int, - timeout: int, - _ttl_hash: int, -) -> AgentCard: - """Cached sync version of fetch_agent_card.""" - auth = _auth_store.get(auth_hash) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete( - _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) - ) - finally: - loop.close() - - -def fetch_agent_card( - endpoint: str, - auth: AuthScheme | None = None, - timeout: int = 30, - use_cache: bool = True, - cache_ttl: int = 300, -) -> AgentCard: - """Fetch AgentCard from an A2A endpoint with optional caching. - - Args: - endpoint: A2A agent endpoint URL (AgentCard URL) - auth: Optional AuthScheme for authentication - timeout: Request timeout in seconds - use_cache: Whether to use caching (default True) - cache_ttl: Cache TTL in seconds (default 300 = 5 minutes) - - Returns: - AgentCard object with agent capabilities and skills - - Raises: - httpx.HTTPStatusError: If the request fails - A2AClientHTTPError: If authentication fails - """ - if use_cache: - if auth: - auth_data = auth.model_dump_json( - exclude={ - "_access_token", - "_token_expires_at", - "_refresh_token", - "_authorization_callback", - } - ) - auth_hash = hash((type(auth).__name__, auth_data)) - else: - auth_hash = 0 - _auth_store[auth_hash] = auth - ttl_hash = int(time.time() // cache_ttl) - return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash) - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete( - afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) - ) - finally: - loop.close() - - -async def afetch_agent_card( - endpoint: str, - auth: AuthScheme | None = None, - timeout: int = 30, - use_cache: bool = True, -) -> AgentCard: - """Fetch AgentCard from an A2A endpoint asynchronously. - - Native async implementation. Use this when running in an async context. - - Args: - endpoint: A2A agent endpoint URL (AgentCard URL). - auth: Optional AuthScheme for authentication. - timeout: Request timeout in seconds. - use_cache: Whether to use caching (default True). - - Returns: - AgentCard object with agent capabilities and skills. - - Raises: - httpx.HTTPStatusError: If the request fails. - A2AClientHTTPError: If authentication fails. - """ - if use_cache: - if auth: - auth_data = auth.model_dump_json( - exclude={ - "_access_token", - "_token_expires_at", - "_refresh_token", - "_authorization_callback", - } - ) - auth_hash = hash((type(auth).__name__, auth_data)) - else: - auth_hash = 0 - _auth_store[auth_hash] = auth - agent_card: AgentCard = await _afetch_agent_card_cached( - endpoint, auth_hash, timeout - ) - return agent_card - - return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) - - -@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator] -async def _afetch_agent_card_cached( - endpoint: str, - auth_hash: int, - timeout: int, -) -> AgentCard: - """Cached async implementation of AgentCard fetching.""" - auth = _auth_store.get(auth_hash) - return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) - - -async def _afetch_agent_card_impl( - endpoint: str, - auth: AuthScheme | None, - timeout: int, -) -> AgentCard: - """Internal async implementation of AgentCard fetching.""" - if "/.well-known/agent-card.json" in endpoint: - base_url = endpoint.replace("/.well-known/agent-card.json", "") - agent_card_path = "/.well-known/agent-card.json" - else: - url_parts = endpoint.split("/", 3) - base_url = f"{url_parts[0]}//{url_parts[2]}" - agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/" - - headers: MutableMapping[str, str] = {} - if auth: - async with httpx.AsyncClient(timeout=timeout) as temp_auth_client: - if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): - configure_auth_client(auth, temp_auth_client) - headers = await auth.apply_auth(temp_auth_client, {}) - - async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client: - if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)): - configure_auth_client(auth, temp_client) - - agent_card_url = f"{base_url}{agent_card_path}" - - async def _fetch_agent_card_request() -> httpx.Response: - return await temp_client.get(agent_card_url) - - try: - response = await retry_on_401( - request_func=_fetch_agent_card_request, - auth_scheme=auth, - client=temp_client, - headers=temp_client.headers, - max_retries=2, - ) - response.raise_for_status() - - return AgentCard.model_validate(response.json()) - - except httpx.HTTPStatusError as e: - if e.response.status_code == 401: - error_details = ["Authentication failed"] - www_auth = e.response.headers.get("WWW-Authenticate") - if www_auth: - error_details.append(f"WWW-Authenticate: {www_auth}") - if not auth: - error_details.append("No auth scheme provided") - msg = " | ".join(error_details) - raise A2AClientHTTPError(401, msg) from e - raise - - def execute_a2a_delegation( endpoint: str, auth: AuthScheme | None, @@ -607,19 +420,18 @@ async def _create_a2a_client( """Create and configure an A2A client. Args: - agent_card: The A2A agent card - transport_protocol: Transport protocol to use - timeout: Request timeout in seconds - headers: HTTP headers (already with auth applied) - streaming: Enable streaming responses - auth: Optional AuthScheme for client configuration - use_polling: Enable polling mode - push_notification_config: Optional push notification config to include in requests + agent_card: The A2A agent card. + transport_protocol: Transport protocol to use. + timeout: Request timeout in seconds. + headers: HTTP headers (already with auth applied). + streaming: Enable streaming responses. + auth: Optional AuthScheme for client configuration. + use_polling: Enable polling mode. + push_notification_config: Optional push notification config. Yields: - Configured A2A client instance + Configured A2A client instance. """ - async with httpx.AsyncClient( timeout=timeout, headers=headers, @@ -650,78 +462,3 @@ async def _create_a2a_client( factory = ClientFactory(config) client = factory.create(agent_card) yield client - - -def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]: - """Create a dynamic AgentResponse model with Literal types for agent IDs. - - Args: - agent_ids: List of available A2A agent IDs - - Returns: - Dynamically created Pydantic model with Literal-constrained a2a_ids field - """ - - DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806 - - return create_model( - "AgentResponse", - a2a_ids=( - tuple[DynamicLiteral, ...], # type: ignore[valid-type] - Field( - default_factory=tuple, - max_length=len(agent_ids), - description="A2A agent IDs to delegate to.", - ), - ), - message=( - str, - Field( - description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation." - ), - ), - is_a2a=( - bool, - Field( - description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately." - ), - ), - __base__=BaseModel, - ) - - -def extract_a2a_agent_ids_from_config( - a2a_config: list[A2AConfig] | A2AConfig | None, -) -> tuple[list[A2AConfig], tuple[str, ...]]: - """Extract A2A agent IDs from A2A configuration. - - Args: - a2a_config: A2A configuration - - Returns: - List of A2A agent IDs - """ - if a2a_config is None: - return [], () - - if isinstance(a2a_config, A2AConfig): - a2a_agents = [a2a_config] - else: - a2a_agents = a2a_config - return a2a_agents, tuple(config.endpoint for config in a2a_agents) - - -def get_a2a_agents_and_response_model( - a2a_config: list[A2AConfig] | A2AConfig | None, -) -> tuple[list[A2AConfig], type[BaseModel]]: - """Get A2A agent IDs and response model. - - Args: - a2a_config: A2A configuration - - Returns: - Tuple of A2A agent IDs and response model - """ - a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config) - - return a2a_agents, create_agent_response_model(agent_ids) diff --git a/lib/crewai/src/crewai/a2a/utils/response_model.py b/lib/crewai/src/crewai/a2a/utils/response_model.py new file mode 100644 index 000000000..9da0f35ea --- /dev/null +++ b/lib/crewai/src/crewai/a2a/utils/response_model.py @@ -0,0 +1,82 @@ +"""Response model utilities for A2A agent interactions.""" + +from __future__ import annotations + +from pydantic import BaseModel, Field, create_model + +from crewai.a2a.config import A2AConfig +from crewai.types.utils import create_literals_from_strings + + +def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]: + """Create a dynamic AgentResponse model with Literal types for agent IDs. + + Args: + agent_ids: List of available A2A agent IDs. + + Returns: + Dynamically created Pydantic model with Literal-constrained a2a_ids field. + """ + DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806 + + return create_model( + "AgentResponse", + a2a_ids=( + tuple[DynamicLiteral, ...], # type: ignore[valid-type] + Field( + default_factory=tuple, + max_length=len(agent_ids), + description="A2A agent IDs to delegate to.", + ), + ), + message=( + str, + Field( + description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation." + ), + ), + is_a2a=( + bool, + Field( + description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately." + ), + ), + __base__=BaseModel, + ) + + +def extract_a2a_agent_ids_from_config( + a2a_config: list[A2AConfig] | A2AConfig | None, +) -> tuple[list[A2AConfig], tuple[str, ...]]: + """Extract A2A agent IDs from A2A configuration. + + Args: + a2a_config: A2A configuration. + + Returns: + Tuple of A2A configs list and agent endpoint IDs. + """ + if a2a_config is None: + return [], () + + if isinstance(a2a_config, A2AConfig): + a2a_agents = [a2a_config] + else: + a2a_agents = a2a_config + return a2a_agents, tuple(config.endpoint for config in a2a_agents) + + +def get_a2a_agents_and_response_model( + a2a_config: list[A2AConfig] | A2AConfig | None, +) -> tuple[list[A2AConfig], type[BaseModel]]: + """Get A2A agent configs and response model. + + Args: + a2a_config: A2A configuration. + + Returns: + Tuple of A2A configs and response model. + """ + a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config) + + return a2a_agents, create_agent_response_model(agent_ids) diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index fc4cdc2c6..29c69ddfe 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -26,13 +26,12 @@ from crewai.a2a.templates import ( UNAVAILABLE_AGENTS_NOTICE_TEMPLATE, ) from crewai.a2a.types import AgentResponseProtocol -from crewai.a2a.utils import ( +from crewai.a2a.utils.agent_card import afetch_agent_card, fetch_agent_card +from crewai.a2a.utils.delegation import ( aexecute_a2a_delegation, - afetch_agent_card, execute_a2a_delegation, - fetch_agent_card, - get_a2a_agents_and_response_model, ) +from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model from crewai.events.event_bus import crewai_event_bus from crewai.events.types.a2a_events import ( A2AConversationCompletedEvent,