diff --git a/lib/crewai/src/crewai/a2a/utils.py b/lib/crewai/src/crewai/a2a/utils.py index 4bbadc00c..ad2209b05 100644 --- a/lib/crewai/src/crewai/a2a/utils.py +++ b/lib/crewai/src/crewai/a2a/utils.py @@ -134,6 +134,52 @@ def fetch_agent_card( loop.close() +async def afetch_agent_card( + endpoint: str, + auth: AuthScheme | None = None, + timeout: int = 30, + use_cache: bool = True, +) -> AgentCard: + """Async version of fetch_agent_card for use in async contexts. + + Fetches AgentCard from an A2A endpoint with optional caching. + This version should be used when calling from an async context + to avoid creating a new event loop. + + Args: + endpoint: A2A agent endpoint URL (AgentCard URL) + auth: Optional AuthScheme for authentication + timeout: Request timeout in seconds + use_cache: Whether to use caching (default True) + + Returns: + AgentCard object with agent capabilities and skills + + Raises: + httpx.HTTPStatusError: If the request fails + A2AClientHTTPError: If authentication fails + """ + if use_cache: + if auth: + auth_data = auth.model_dump_json( + exclude={ + "_access_token", + "_token_expires_at", + "_refresh_token", + "_authorization_callback", + } + ) + auth_hash = hash((type(auth).__name__, auth_data)) + else: + auth_hash = 0 + _auth_store[auth_hash] = auth + return await _fetch_agent_card_async_cached( + endpoint=endpoint, auth_hash=auth_hash, timeout=timeout + ) + + return await _fetch_agent_card_async(endpoint=endpoint, auth=auth, timeout=timeout) + + @cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator] async def _fetch_agent_card_async_cached( endpoint: str, @@ -329,6 +375,114 @@ def execute_a2a_delegation( loop.close() +async def aexecute_a2a_delegation( + endpoint: str, + auth: AuthScheme | None, + timeout: int, + task_description: str, + context: str | None = None, + context_id: str | None = None, + task_id: str | None = None, + reference_task_ids: list[str] | None = None, + metadata: dict[str, Any] | None = None, + extensions: dict[str, Any] | None = None, + conversation_history: list[Message] | None = None, + agent_id: str | None = None, + agent_role: Role | None = None, + agent_branch: Any | None = None, + response_model: type[BaseModel] | None = None, + turn_number: int | None = None, +) -> dict[str, Any]: + """Async version of execute_a2a_delegation for use in async contexts. + + Execute a task delegation to a remote A2A agent with multi-turn support. + This version should be used when calling from an async context + to avoid creating a new event loop. + + Handles: + - AgentCard discovery + - Authentication setup + - Message creation and sending + - Response parsing + - Multi-turn conversations + + Args: + endpoint: A2A agent endpoint URL (AgentCard URL) + auth: Optional AuthScheme for authentication (Bearer, OAuth2, API Key, HTTP Basic/Digest) + timeout: Request timeout in seconds + task_description: The task to delegate + context: Optional context information + context_id: Context ID for correlating messages/tasks + task_id: Specific task identifier + reference_task_ids: List of related task IDs + metadata: Additional metadata (external_id, request_id, etc.) + extensions: Protocol extensions for custom fields + conversation_history: Previous Message objects from conversation + agent_id: Agent identifier for logging + agent_role: Role of the CrewAI agent delegating the task + agent_branch: Optional agent tree branch for logging + response_model: Optional Pydantic model for structured outputs + turn_number: Optional turn number for multi-turn conversations + + Returns: + Dictionary with: + - status: "completed", "input_required", "failed", etc. + - result: Result string (if completed) + - error: Error message (if failed) + - history: List of new Message objects from this exchange + + Raises: + ImportError: If a2a-sdk is not installed + """ + is_multiturn = bool(conversation_history and len(conversation_history) > 0) + if turn_number is None: + turn_number = ( + len([m for m in (conversation_history or []) if m.role == Role.user]) + 1 + ) + crewai_event_bus.emit( + agent_branch, + A2ADelegationStartedEvent( + endpoint=endpoint, + task_description=task_description, + agent_id=agent_id, + is_multiturn=is_multiturn, + turn_number=turn_number, + ), + ) + + result = await _execute_a2a_delegation_async( + endpoint=endpoint, + auth=auth, + timeout=timeout, + task_description=task_description, + context=context, + context_id=context_id, + task_id=task_id, + reference_task_ids=reference_task_ids, + metadata=metadata, + extensions=extensions, + conversation_history=conversation_history or [], + is_multiturn=is_multiturn, + turn_number=turn_number, + agent_branch=agent_branch, + agent_id=agent_id, + agent_role=agent_role, + response_model=response_model, + ) + + crewai_event_bus.emit( + agent_branch, + A2ADelegationCompletedEvent( + status=result["status"], + result=result.get("result"), + error=result.get("error"), + is_multiturn=is_multiturn, + ), + ) + + return result + + async def _execute_a2a_delegation_async( endpoint: str, auth: AuthScheme | None, diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 4c98e6f30..7a2c8f3ed 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -5,7 +5,8 @@ Wraps agent classes with A2A delegation capabilities. from __future__ import annotations -from collections.abc import Callable +import asyncio +from collections.abc import Callable, Coroutine from concurrent.futures import ThreadPoolExecutor, as_completed from functools import wraps from types import MethodType @@ -24,6 +25,8 @@ from crewai.a2a.templates import ( ) from crewai.a2a.types import AgentResponseProtocol from crewai.a2a.utils import ( + aexecute_a2a_delegation, + afetch_agent_card, execute_a2a_delegation, fetch_agent_card, get_a2a_agents_and_response_model, @@ -46,11 +49,11 @@ if TYPE_CHECKING: def wrap_agent_with_a2a_instance( agent: Agent, extension_registry: ExtensionRegistry | None = None ) -> None: - """Wrap an agent instance's execute_task method with A2A support. + """Wrap an agent instance's execute_task and aexecute_task methods with A2A support. This function modifies the agent instance by wrapping its execute_task - method to add A2A delegation capabilities. Should only be called when - the agent has a2a configuration set. + and aexecute_task methods to add A2A delegation capabilities. Should only + be called when the agent has a2a configuration set. Args: agent: The agent instance to wrap @@ -99,6 +102,49 @@ def wrap_agent_with_a2a_instance( object.__setattr__(agent, "execute_task", MethodType(execute_task_with_a2a, agent)) + original_aexecute_task = agent.aexecute_task.__func__ # type: ignore[attr-defined] + + @wraps(original_aexecute_task) + async def aexecute_task_with_a2a( + self: Agent, + task: Task, + context: str | None = None, + tools: list[BaseTool] | None = None, + ) -> str: + """Execute task asynchronously with A2A delegation support. + + This async version should be used when calling from an async context + to avoid creating a new event loop. + + Args: + self: The agent instance + task: The task to execute + context: Optional context for task execution + tools: Optional tools available to the agent + + Returns: + Task execution result + """ + if not self.a2a: + return await original_aexecute_task(self, task, context, tools) # type: ignore[no-any-return] + + a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) + + return await _aexecute_task_with_a2a( + self=self, + a2a_agents=a2a_agents, + original_fn=original_aexecute_task, + task=task, + agent_response_model=agent_response_model, + context=context, + tools=tools, + extension_registry=extension_registry, + ) + + object.__setattr__( + agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent) + ) + def _fetch_card_from_config( config: A2AConfig, @@ -701,3 +747,497 @@ def _delegate_to_a2a( finally: task.description = original_task_description + + +async def _afetch_card_from_config( + config: A2AConfig, +) -> tuple[A2AConfig, AgentCard | Exception]: + """Async version of _fetch_card_from_config. + + Fetch agent card from A2A config asynchronously. + + Args: + config: A2A configuration + + Returns: + Tuple of (config, card or exception) + """ + try: + card = await afetch_agent_card( + endpoint=config.endpoint, + auth=config.auth, + timeout=config.timeout, + ) + return config, card + except Exception as e: + return config, e + + +async def _afetch_agent_cards_concurrently( + a2a_agents: list[A2AConfig], +) -> tuple[dict[str, AgentCard], dict[str, str]]: + """Async version of _fetch_agent_cards_concurrently. + + Fetch agent cards concurrently for multiple A2A agents using asyncio.gather. + + Args: + a2a_agents: List of A2A agent configurations + + Returns: + Tuple of (agent_cards dict, failed_agents dict mapping endpoint to error message) + """ + agent_cards: dict[str, AgentCard] = {} + failed_agents: dict[str, str] = {} + + if not a2a_agents: + return agent_cards, failed_agents + + results = await asyncio.gather( + *[_afetch_card_from_config(config) for config in a2a_agents], + return_exceptions=False, + ) + + for config, result in results: + if isinstance(result, Exception): + if config.fail_fast: + raise RuntimeError( + f"Failed to fetch agent card from {config.endpoint}. " + f"Ensure the A2A agent is running and accessible. Error: {result}" + ) from result + failed_agents[config.endpoint] = str(result) + else: + agent_cards[config.endpoint] = result + + return agent_cards, failed_agents + + +async def _aexecute_task_with_a2a( + self: Agent, + a2a_agents: list[A2AConfig], + original_fn: Callable[..., Coroutine[Any, Any, str]], + task: Task, + agent_response_model: type[BaseModel], + context: str | None, + tools: list[BaseTool] | None, + extension_registry: ExtensionRegistry, +) -> str: + """Async version of _execute_task_with_a2a. + + Wrap aexecute_task with A2A delegation logic. + + Args: + self: The agent instance + a2a_agents: Dictionary of A2A agent configurations + original_fn: The original aexecute_task method + task: The task to execute + context: Optional context for task execution + tools: Optional tools available to the agent + agent_response_model: Optional agent response model + extension_registry: Registry of A2A extensions + + Returns: + Task execution result (either from LLM or A2A agent) + """ + original_description: str = task.description + original_output_pydantic = task.output_pydantic + original_response_model = task.response_model + + agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_agents) + + if not agent_cards and a2a_agents and failed_agents: + unavailable_agents_text = "" + for endpoint, error in failed_agents.items(): + unavailable_agents_text += f" - {endpoint}: {error}\n" + + notice = UNAVAILABLE_AGENTS_NOTICE_TEMPLATE.substitute( + unavailable_agents=unavailable_agents_text + ) + task.description = f"{original_description}{notice}" + + try: + return await original_fn(self, task, context, tools) + finally: + task.description = original_description + + task.description, _ = _augment_prompt_with_a2a( + a2a_agents=a2a_agents, + task_description=original_description, + agent_cards=agent_cards, + failed_agents=failed_agents, + extension_registry=extension_registry, + ) + task.response_model = agent_response_model + + try: + raw_result = await original_fn(self, task, context, tools) + agent_response = _parse_agent_response( + raw_result=raw_result, agent_response_model=agent_response_model + ) + + if extension_registry and isinstance(agent_response, BaseModel): + agent_response = extension_registry.process_response_with_all( + agent_response, {} + ) + + if isinstance(agent_response, BaseModel) and isinstance( + agent_response, AgentResponseProtocol + ): + if agent_response.is_a2a: + return await _adelegate_to_a2a( + self, + agent_response=agent_response, + task=task, + original_fn=original_fn, + context=context, + tools=tools, + agent_cards=agent_cards, + original_task_description=original_description, + extension_registry=extension_registry, + ) + return str(agent_response.message) + + return raw_result + finally: + task.description = original_description + task.output_pydantic = original_output_pydantic + task.response_model = original_response_model + + +async def _ahandle_agent_response_and_continue( + self: Agent, + a2a_result: dict[str, Any], + agent_id: str, + agent_cards: dict[str, AgentCard] | None, + a2a_agents: list[A2AConfig], + original_task_description: str, + conversation_history: list[Message], + turn_num: int, + max_turns: int, + task: Task, + original_fn: Callable[..., Coroutine[Any, Any, str]], + context: str | None, + tools: list[BaseTool] | None, + agent_response_model: type[BaseModel], +) -> tuple[str | None, str | None]: + """Async version of _handle_agent_response_and_continue. + + Handle A2A result and get CrewAI agent's response asynchronously. + + Args: + self: The agent instance + a2a_result: Result from A2A delegation + agent_id: ID of the A2A agent + agent_cards: Pre-fetched agent cards + a2a_agents: List of A2A configurations + original_task_description: Original task description + conversation_history: Conversation history + turn_num: Current turn number + max_turns: Maximum turns allowed + task: The task being executed + original_fn: Original aexecute_task method + context: Optional context + tools: Optional tools + agent_response_model: Response model for parsing + + Returns: + Tuple of (final_result, current_request) where: + - final_result is not None if conversation should end + - current_request is the next message to send if continuing + """ + agent_cards_dict = agent_cards or {} + if "agent_card" in a2a_result and agent_id not in agent_cards_dict: + agent_cards_dict[agent_id] = a2a_result["agent_card"] + + task.description, disable_structured_output = _augment_prompt_with_a2a( + a2a_agents=a2a_agents, + task_description=original_task_description, + conversation_history=conversation_history, + turn_num=turn_num, + max_turns=max_turns, + agent_cards=agent_cards_dict, + ) + + original_response_model = task.response_model + if disable_structured_output: + task.response_model = None + + raw_result = await original_fn(self, task, context, tools) + + if disable_structured_output: + task.response_model = original_response_model + + if disable_structured_output: + final_turn_number = turn_num + 1 + result_text = str(raw_result) + crewai_event_bus.emit( + None, + A2AMessageSentEvent( + message=result_text, + turn_number=final_turn_number, + is_multiturn=True, + agent_role=self.role, + ), + ) + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="completed", + final_result=result_text, + error=None, + total_turns=final_turn_number, + ), + ) + return result_text, None + + llm_response = _parse_agent_response( + raw_result=raw_result, agent_response_model=agent_response_model + ) + + if isinstance(llm_response, BaseModel) and isinstance( + llm_response, AgentResponseProtocol + ): + if not llm_response.is_a2a: + final_turn_number = turn_num + 1 + crewai_event_bus.emit( + None, + A2AMessageSentEvent( + message=str(llm_response.message), + turn_number=final_turn_number, + is_multiturn=True, + agent_role=self.role, + ), + ) + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="completed", + final_result=str(llm_response.message), + error=None, + total_turns=final_turn_number, + ), + ) + return str(llm_response.message), None + return None, str(llm_response.message) + + return str(raw_result), None + + +async def _adelegate_to_a2a( + self: Agent, + agent_response: AgentResponseProtocol, + task: Task, + original_fn: Callable[..., Coroutine[Any, Any, str]], + context: str | None, + tools: list[BaseTool] | None, + agent_cards: dict[str, AgentCard] | None = None, + original_task_description: str | None = None, + extension_registry: ExtensionRegistry | None = None, +) -> str: + """Async version of _delegate_to_a2a. + + Delegate to A2A agent with multi-turn conversation support asynchronously. + + Args: + self: The agent instance + agent_response: The AgentResponse indicating delegation + task: The task being executed (for extracting A2A fields) + original_fn: The original aexecute_task method for follow-ups + context: Optional context for task execution + tools: Optional tools available to the agent + agent_cards: Pre-fetched agent cards from _aexecute_task_with_a2a + original_task_description: The original task description before A2A augmentation + extension_registry: Optional registry of A2A extensions + + Returns: + Result from A2A agent + + Raises: + ImportError: If a2a-sdk is not installed + """ + a2a_agents, agent_response_model = get_a2a_agents_and_response_model(self.a2a) + agent_ids = tuple(config.endpoint for config in a2a_agents) + current_request = str(agent_response.message) + + if hasattr(agent_response, "a2a_ids") and agent_response.a2a_ids: + agent_id = agent_response.a2a_ids[0] + else: + agent_id = agent_ids[0] if agent_ids else "" + + if agent_id and agent_id not in agent_ids: + raise ValueError( + f"Unknown A2A agent ID(s): {agent_response.a2a_ids} not in {agent_ids}" + ) + + agent_config = next(filter(lambda x: x.endpoint == agent_id, a2a_agents)) + task_config = task.config or {} + context_id = task_config.get("context_id") + task_id_config = task_config.get("task_id") + metadata = task_config.get("metadata") + extensions = task_config.get("extensions") + + reference_task_ids = task_config.get("reference_task_ids", []) + + if original_task_description is None: + original_task_description = task.description + + conversation_history: list[Message] = [] + max_turns = agent_config.max_turns + + try: + for turn_num in range(max_turns): + console_formatter = getattr(crewai_event_bus, "_console", None) + agent_branch = None + if console_formatter: + agent_branch = getattr( + console_formatter, "current_agent_branch", None + ) or getattr(console_formatter, "current_task_branch", None) + + a2a_result = await aexecute_a2a_delegation( + endpoint=agent_config.endpoint, + auth=agent_config.auth, + timeout=agent_config.timeout, + task_description=current_request, + context_id=context_id, + task_id=task_id_config, + reference_task_ids=reference_task_ids, + metadata=metadata, + extensions=extensions, + conversation_history=conversation_history, + agent_id=agent_id, + agent_role=Role.user, + agent_branch=agent_branch, + response_model=agent_config.response_model, + turn_number=turn_num + 1, + ) + + conversation_history = a2a_result.get("history", []) + + if conversation_history: + latest_message = conversation_history[-1] + if latest_message.task_id is not None: + task_id_config = latest_message.task_id + if latest_message.context_id is not None: + context_id = latest_message.context_id + + if a2a_result["status"] in ["completed", "input_required"]: + if ( + a2a_result["status"] == "completed" + and agent_config.trust_remote_completion_status + ): + if ( + task_id_config is not None + and task_id_config not in reference_task_ids + ): + reference_task_ids.append(task_id_config) + if task.config is None: + task.config = {} + task.config["reference_task_ids"] = reference_task_ids + + result_text = a2a_result.get("result", "") + final_turn_number = turn_num + 1 + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="completed", + final_result=result_text, + error=None, + total_turns=final_turn_number, + ), + ) + return cast(str, result_text) + + final_result, next_request = await _ahandle_agent_response_and_continue( + self=self, + a2a_result=a2a_result, + agent_id=agent_id, + agent_cards=agent_cards, + a2a_agents=a2a_agents, + original_task_description=original_task_description, + conversation_history=conversation_history, + turn_num=turn_num, + max_turns=max_turns, + task=task, + original_fn=original_fn, + context=context, + tools=tools, + agent_response_model=agent_response_model, + ) + + if final_result is not None: + return final_result + + if next_request is not None: + current_request = next_request + + continue + + error_msg = a2a_result.get("error", "Unknown error") + + final_result, next_request = await _ahandle_agent_response_and_continue( + self=self, + a2a_result=a2a_result, + agent_id=agent_id, + agent_cards=agent_cards, + a2a_agents=a2a_agents, + original_task_description=original_task_description, + conversation_history=conversation_history, + turn_num=turn_num, + max_turns=max_turns, + task=task, + original_fn=original_fn, + context=context, + tools=tools, + agent_response_model=agent_response_model, + ) + + if final_result is not None: + return final_result + + if next_request is not None: + current_request = next_request + continue + + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="failed", + final_result=None, + error=error_msg, + total_turns=turn_num + 1, + ), + ) + return f"A2A delegation failed: {error_msg}" + + if conversation_history: + for msg in reversed(conversation_history): + if msg.role == Role.agent: + text_parts = [ + part.root.text for part in msg.parts if part.root.kind == "text" + ] + final_message = ( + " ".join(text_parts) if text_parts else "Conversation completed" + ) + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="completed", + final_result=final_message, + error=None, + total_turns=max_turns, + ), + ) + return final_message + + crewai_event_bus.emit( + None, + A2AConversationCompletedEvent( + status="failed", + final_result=None, + error=f"Conversation exceeded maximum turns ({max_turns})", + total_turns=max_turns, + ), + ) + raise Exception(f"A2A conversation exceeded maximum turns ({max_turns})") + + finally: + task.description = original_task_description diff --git a/lib/crewai/tests/agents/test_a2a_async_execution.py b/lib/crewai/tests/agents/test_a2a_async_execution.py new file mode 100644 index 000000000..6406558f2 --- /dev/null +++ b/lib/crewai/tests/agents/test_a2a_async_execution.py @@ -0,0 +1,294 @@ +"""Test A2A async execution support. + +Tests that verify async execution works correctly without creating new event loops. +""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from crewai import Agent +from crewai.a2a.config import A2AConfig + +try: + from a2a.types import Message, Role + + A2A_SDK_INSTALLED = True +except ImportError: + A2A_SDK_INSTALLED = False + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +def test_agent_with_a2a_has_async_wrapper(): + """Verify that agents with a2a get the async wrapper applied to aexecute_task.""" + a2a_config = A2AConfig( + endpoint="http://test-endpoint.com", + ) + + agent = Agent( + role="test role", + goal="test goal", + backstory="test backstory", + a2a=a2a_config, + ) + + assert agent.a2a is not None + assert callable(agent.aexecute_task) + assert hasattr(agent.aexecute_task, "__wrapped__") + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +def test_async_wrapper_is_applied_differently_per_instance(): + """Verify that agents with and without a2a have different aexecute_task methods.""" + agent_without_a2a = Agent( + role="agent without a2a", + goal="test goal", + backstory="test backstory", + ) + + a2a_config = A2AConfig(endpoint="http://test-endpoint.com") + agent_with_a2a = Agent( + role="agent with a2a", + goal="test goal", + backstory="test backstory", + a2a=a2a_config, + ) + + assert ( + agent_without_a2a.aexecute_task.__func__ + is not agent_with_a2a.aexecute_task.__func__ + ) + assert not hasattr(agent_without_a2a.aexecute_task, "__wrapped__") + assert hasattr(agent_with_a2a.aexecute_task, "__wrapped__") + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +@pytest.mark.asyncio +async def test_async_delegate_to_a2a_does_not_create_new_event_loop(): + """Verify that async A2A delegation doesn't create a new event loop.""" + from crewai.a2a.wrapper import _adelegate_to_a2a + from crewai import Task + + a2a_config = A2AConfig( + endpoint="http://test-endpoint.com", + trust_remote_completion_status=True, + ) + + agent = Agent( + role="test manager", + goal="coordinate", + backstory="test", + a2a=a2a_config, + ) + + task = Task(description="test", expected_output="test", agent=agent) + + class MockResponse: + is_a2a = True + message = "Please help" + a2a_ids = ["http://test-endpoint.com/"] + + async def mock_original_fn(self, task, context, tools): + return '{"is_a2a": false, "message": "Done", "a2a_ids": []}' + + with ( + patch("crewai.a2a.wrapper.aexecute_a2a_delegation") as mock_execute, + patch("crewai.a2a.wrapper._afetch_agent_cards_concurrently") as mock_fetch, + patch("asyncio.new_event_loop") as mock_new_loop, + ): + mock_card = MagicMock() + mock_card.name = "Test" + mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {}) + + mock_execute.return_value = { + "status": "completed", + "result": "Done by remote", + "history": [], + } + + result = await _adelegate_to_a2a( + self=agent, + agent_response=MockResponse(), + task=task, + original_fn=mock_original_fn, + context=None, + tools=None, + agent_cards={"http://test-endpoint.com/": mock_card}, + original_task_description="test", + ) + + assert result == "Done by remote" + mock_new_loop.assert_not_called() + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +@pytest.mark.asyncio +async def test_aexecute_a2a_delegation_does_not_create_new_event_loop(): + """Verify that aexecute_a2a_delegation doesn't create a new event loop.""" + from crewai.a2a.utils import aexecute_a2a_delegation + + with ( + patch( + "crewai.a2a.utils._execute_a2a_delegation_async" + ) as mock_execute_async, + patch("asyncio.new_event_loop") as mock_new_loop, + ): + mock_execute_async.return_value = { + "status": "completed", + "result": "Done", + "history": [], + } + + result = await aexecute_a2a_delegation( + endpoint="http://test-endpoint.com", + auth=None, + timeout=30, + task_description="test task", + agent_id="test-agent", + ) + + assert result["status"] == "completed" + mock_new_loop.assert_not_called() + mock_execute_async.assert_called_once() + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +@pytest.mark.asyncio +async def test_afetch_agent_card_does_not_create_new_event_loop(): + """Verify that afetch_agent_card doesn't create a new event loop.""" + from crewai.a2a.utils import afetch_agent_card + + with ( + patch("crewai.a2a.utils._fetch_agent_card_async") as mock_fetch_async, + patch("asyncio.new_event_loop") as mock_new_loop, + ): + mock_card = MagicMock() + mock_card.name = "Test Agent" + mock_fetch_async.return_value = mock_card + + result = await afetch_agent_card( + endpoint="http://test-endpoint.com", + auth=None, + timeout=30, + use_cache=False, + ) + + assert result.name == "Test Agent" + mock_new_loop.assert_not_called() + mock_fetch_async.assert_called_once() + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +@pytest.mark.asyncio +async def test_afetch_agent_cards_concurrently(): + """Verify that _afetch_agent_cards_concurrently fetches cards using asyncio.gather.""" + from crewai.a2a.wrapper import _afetch_agent_cards_concurrently + + a2a_configs = [ + A2AConfig(endpoint="http://test-endpoint-1.com"), + A2AConfig(endpoint="http://test-endpoint-2.com"), + ] + + with patch("crewai.a2a.wrapper.afetch_agent_card") as mock_fetch: + mock_card1 = MagicMock() + mock_card1.name = "Agent 1" + mock_card2 = MagicMock() + mock_card2.name = "Agent 2" + + async def side_effect(endpoint, auth, timeout): + if "endpoint-1" in endpoint: + return mock_card1 + return mock_card2 + + mock_fetch.side_effect = side_effect + + agent_cards, failed_agents = await _afetch_agent_cards_concurrently(a2a_configs) + + assert len(agent_cards) == 2 + assert len(failed_agents) == 0 + assert mock_fetch.call_count == 2 + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +@pytest.mark.asyncio +async def test_aexecute_task_with_a2a_uses_async_path(): + """Verify that _aexecute_task_with_a2a uses the async delegation path.""" + from crewai.a2a.wrapper import _aexecute_task_with_a2a + from crewai.a2a.utils import get_a2a_agents_and_response_model + from crewai import Task + + a2a_config = A2AConfig( + endpoint="http://test-endpoint.com", + ) + + agent = Agent( + role="test role", + goal="test goal", + backstory="test backstory", + a2a=a2a_config, + ) + + task = Task(description="test task", expected_output="test output", agent=agent) + + a2a_agents, agent_response_model = get_a2a_agents_and_response_model(a2a_config) + + async def mock_original_fn(self, task, context, tools): + return '{"is_a2a": false, "message": "Direct response", "a2a_ids": []}' + + with ( + patch("crewai.a2a.wrapper._afetch_agent_cards_concurrently") as mock_fetch, + ): + mock_card = MagicMock() + mock_card.name = "Test" + mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {}) + + from crewai.a2a.extensions.base import ExtensionRegistry + + result = await _aexecute_task_with_a2a( + self=agent, + a2a_agents=a2a_agents, + original_fn=mock_original_fn, + task=task, + agent_response_model=agent_response_model, + context=None, + tools=None, + extension_registry=ExtensionRegistry(), + ) + + assert result == "Direct response" + mock_fetch.assert_called_once() + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +@pytest.mark.asyncio +async def test_async_execution_in_running_event_loop(): + """Verify that async A2A execution works correctly within a running event loop. + + This test simulates the scenario described in issue #4162 where A2A is called + from an async context that already has a running event loop. + """ + from crewai.a2a.utils import aexecute_a2a_delegation + + current_loop = asyncio.get_running_loop() + assert current_loop is not None + + with patch( + "crewai.a2a.utils._execute_a2a_delegation_async" + ) as mock_execute_async: + mock_execute_async.return_value = { + "status": "completed", + "result": "Success from async context", + "history": [], + } + + result = await aexecute_a2a_delegation( + endpoint="http://test-endpoint.com", + auth=None, + timeout=30, + task_description="test task from async context", + agent_id="test-agent", + ) + + assert result["status"] == "completed" + assert result["result"] == "Success from async context"