From c2b151bd303af3bd275a3a88bd2648f3d4c60eac Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 2 Mar 2026 10:57:07 +0000 Subject: [PATCH] fix: handle running event loop in A2A sync wrappers (#4671) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit execute_a2a_delegation() and fetch_agent_card() now use a ThreadPoolExecutor to run async code in a separate thread when an event loop is already running (e.g. Jupyter notebooks), instead of raising RuntimeError. Co-Authored-By: João --- lib/crewai/src/crewai/a2a/utils/agent_card.py | 18 ++- lib/crewai/src/crewai/a2a/utils/delegation.py | 87 ++++++------ .../a2a/utils/test_agent_card_event_loop.py | 110 +++++++++++++++ .../a2a/utils/test_delegation_event_loop.py | 129 ++++++++++++++++++ 4 files changed, 296 insertions(+), 48 deletions(-) create mode 100644 lib/crewai/tests/a2a/utils/test_agent_card_event_loop.py create mode 100644 lib/crewai/tests/a2a/utils/test_delegation_event_loop.py diff --git a/lib/crewai/src/crewai/a2a/utils/agent_card.py b/lib/crewai/src/crewai/a2a/utils/agent_card.py index c548cd1e7..3f6c0aab8 100644 --- a/lib/crewai/src/crewai/a2a/utils/agent_card.py +++ b/lib/crewai/src/crewai/a2a/utils/agent_card.py @@ -138,12 +138,24 @@ def fetch_agent_card( ttl_hash = int(time.time() // cache_ttl) return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash) + coro = afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) + + try: + asyncio.get_running_loop() + # Already inside an event loop (e.g. Jupyter notebook) - run the + # coroutine in a separate thread with its own event loop. + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(asyncio.run, coro) + return future.result() + except RuntimeError: + pass + loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - return loop.run_until_complete( - afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) - ) + return loop.run_until_complete(coro) finally: loop.close() diff --git a/lib/crewai/src/crewai/a2a/utils/delegation.py b/lib/crewai/src/crewai/a2a/utils/delegation.py index cfcf51f36..6b7b1310d 100644 --- a/lib/crewai/src/crewai/a2a/utils/delegation.py +++ b/lib/crewai/src/crewai/a2a/utils/delegation.py @@ -158,13 +158,10 @@ def execute_a2a_delegation( ) -> TaskStateResult: """Execute a task delegation to a remote A2A agent synchronously. - WARNING: This function blocks the entire thread by creating and running a new - event loop. Prefer using 'await aexecute_a2a_delegation()' in async contexts - for better performance and resource efficiency. - - This is a synchronous wrapper around aexecute_a2a_delegation that creates a - new event loop to run the async implementation. It is provided for compatibility - with synchronous code paths only. + This is a synchronous wrapper around aexecute_a2a_delegation. When no event + loop is running it creates a new one; when called from within an existing + event loop (e.g. Jupyter notebooks) it transparently runs the coroutine in + a separate thread to avoid "cannot run nested event loop" errors. Args: endpoint: A2A agent endpoint URL (AgentCard URL). @@ -194,51 +191,51 @@ def execute_a2a_delegation( Returns: TaskStateResult with status, result/error, history, and agent_card. - - Raises: - RuntimeError: If called from an async context with a running event loop. """ + coro = aexecute_a2a_delegation( + endpoint=endpoint, + auth=auth, + timeout=timeout, + task_description=task_description, + context=context, + context_id=context_id, + task_id=task_id, + reference_task_ids=reference_task_ids, + metadata=metadata, + extensions=extensions, + conversation_history=conversation_history, + agent_id=agent_id, + agent_role=agent_role, + agent_branch=agent_branch, + response_model=response_model, + turn_number=turn_number, + updates=updates, + from_task=from_task, + from_agent=from_agent, + skill_id=skill_id, + client_extensions=client_extensions, + transport=transport, + accepted_output_modes=accepted_output_modes, + input_files=input_files, + ) + try: asyncio.get_running_loop() - raise RuntimeError( - "execute_a2a_delegation() cannot be called from an async context. " - "Use 'await aexecute_a2a_delegation()' instead." - ) - except RuntimeError as e: - if "no running event loop" not in str(e).lower(): - raise + # Already inside an event loop (e.g. Jupyter notebook) - run the + # coroutine in a separate thread with its own event loop so we don't + # block or conflict with the running one. + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(asyncio.run, coro) + return future.result() + except RuntimeError: + pass loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - return loop.run_until_complete( - aexecute_a2a_delegation( - endpoint=endpoint, - auth=auth, - timeout=timeout, - task_description=task_description, - context=context, - context_id=context_id, - task_id=task_id, - reference_task_ids=reference_task_ids, - metadata=metadata, - extensions=extensions, - conversation_history=conversation_history, - agent_id=agent_id, - agent_role=agent_role, - agent_branch=agent_branch, - response_model=response_model, - turn_number=turn_number, - updates=updates, - from_task=from_task, - from_agent=from_agent, - skill_id=skill_id, - client_extensions=client_extensions, - transport=transport, - accepted_output_modes=accepted_output_modes, - input_files=input_files, - ) - ) + return loop.run_until_complete(coro) finally: try: loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/lib/crewai/tests/a2a/utils/test_agent_card_event_loop.py b/lib/crewai/tests/a2a/utils/test_agent_card_event_loop.py new file mode 100644 index 000000000..5303c3f77 --- /dev/null +++ b/lib/crewai/tests/a2a/utils/test_agent_card_event_loop.py @@ -0,0 +1,110 @@ +"""Tests for fetch_agent_card sync wrapper when a running event loop exists. + +Covers the same class of issue as #4671 — ``fetch_agent_card()`` must work +even when called from within an already-running event loop (e.g. Jupyter). +""" + +from __future__ import annotations + +import asyncio +from typing import Any +from unittest.mock import AsyncMock, patch + +import pytest +from a2a.types import AgentCapabilities, AgentCard + +from crewai.a2a.utils.agent_card import fetch_agent_card + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _mock_agent_card() -> AgentCard: + """Return a minimal ``AgentCard`` for mocking.""" + return AgentCard( + name="Test Agent", + description="A test agent", + url="http://localhost:9999", + version="1.0.0", + capabilities=AgentCapabilities(streaming=False), + default_input_modes=["text"], + default_output_modes=["text"], + skills=[], + ) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestFetchAgentCardEventLoop: + """Verify fetch_agent_card works with and without a running loop.""" + + @patch("crewai.a2a.utils.agent_card.afetch_agent_card") + def test_works_without_running_event_loop( + self, mock_async_fn: AsyncMock + ) -> None: + """Normal case: no running event loop, should succeed directly.""" + expected = _mock_agent_card() + mock_async_fn.return_value = expected + + result = fetch_agent_card( + endpoint="http://localhost:9999", + use_cache=False, + ) + + assert result.name == "Test Agent" + mock_async_fn.assert_called_once() + + @patch("crewai.a2a.utils.agent_card.afetch_agent_card") + def test_works_inside_running_event_loop( + self, mock_async_fn: AsyncMock + ) -> None: + """Regression test: must not raise when a loop is already running.""" + expected = _mock_agent_card() + mock_async_fn.return_value = expected + + result_holder: list[Any] = [] + error_holder: list[Exception] = [] + + async def _call_sync_from_async() -> None: + try: + res = fetch_agent_card( + endpoint="http://localhost:9999", + use_cache=False, + ) + result_holder.append(res) + except Exception as exc: + error_holder.append(exc) + + asyncio.run(_call_sync_from_async()) + + assert not error_holder, f"Unexpected error: {error_holder[0]}" + assert len(result_holder) == 1 + assert result_holder[0].name == "Test Agent" + + @patch("crewai.a2a.utils.agent_card.afetch_agent_card") + def test_propagates_errors_inside_running_event_loop( + self, mock_async_fn: AsyncMock + ) -> None: + """Errors should propagate even when called from a running loop.""" + mock_async_fn.side_effect = ConnectionError("cannot reach agent") + + error_holder: list[Exception] = [] + + async def _call_sync_from_async() -> None: + try: + fetch_agent_card( + endpoint="http://localhost:9999", + use_cache=False, + ) + except Exception as exc: + error_holder.append(exc) + + asyncio.run(_call_sync_from_async()) + + assert len(error_holder) == 1 + assert isinstance(error_holder[0], ConnectionError) + assert "cannot reach agent" in str(error_holder[0]) diff --git a/lib/crewai/tests/a2a/utils/test_delegation_event_loop.py b/lib/crewai/tests/a2a/utils/test_delegation_event_loop.py new file mode 100644 index 000000000..b2b0e85b6 --- /dev/null +++ b/lib/crewai/tests/a2a/utils/test_delegation_event_loop.py @@ -0,0 +1,129 @@ +"""Tests for A2A delegation sync wrappers when a running event loop exists. + +Covers the fix for https://github.com/crewAIInc/crewAI/issues/4671 where +``execute_a2a_delegation()`` raised ``RuntimeError`` when called from an +environment that already has a running event loop (e.g. Jupyter notebooks). +""" + +from __future__ import annotations + +import asyncio +import threading +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from crewai.a2a.task_helpers import TaskStateResult +from crewai.a2a.utils.delegation import execute_a2a_delegation + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _minimal_task_state_result(**overrides: Any) -> TaskStateResult: + """Return a minimal ``TaskStateResult`` dict for mocking.""" + base: TaskStateResult = { + "status": "completed", + "result": "mocked result", + "error": None, + "history": [], + "agent_card": None, + } + base.update(overrides) # type: ignore[typeddict-item] + return base + + +_DELEGATION_KWARGS: dict[str, Any] = dict( + endpoint="http://localhost:9999/.well-known/agent-card.json", + auth=None, + timeout=30, + task_description="test task", +) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestExecuteA2ADelegationEventLoop: + """Verify execute_a2a_delegation works with and without a running loop.""" + + @patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation") + def test_works_without_running_event_loop( + self, mock_async_fn: AsyncMock + ) -> None: + """Normal case: no running event loop, should succeed directly.""" + expected = _minimal_task_state_result() + mock_async_fn.return_value = expected + + result = execute_a2a_delegation(**_DELEGATION_KWARGS) + + assert result["status"] == "completed" + assert result["result"] == "mocked result" + mock_async_fn.assert_called_once() + + @patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation") + def test_works_inside_running_event_loop( + self, mock_async_fn: AsyncMock + ) -> None: + """Regression test for #4671: must not raise when a loop is running. + + Simulates the Jupyter notebook environment by calling the sync wrapper + from within an already-running event loop. + """ + expected = _minimal_task_state_result() + mock_async_fn.return_value = expected + + result_holder: list[Any] = [] + error_holder: list[Exception] = [] + + async def _call_sync_from_async() -> None: + """Call the sync function from within a running event loop.""" + try: + # This must NOT raise RuntimeError anymore + res = execute_a2a_delegation(**_DELEGATION_KWARGS) + result_holder.append(res) + except Exception as exc: + error_holder.append(exc) + + # Run inside an event loop, simulating a Jupyter notebook + asyncio.run(_call_sync_from_async()) + + assert not error_holder, f"Unexpected error: {error_holder[0]}" + assert len(result_holder) == 1 + assert result_holder[0]["status"] == "completed" + assert result_holder[0]["result"] == "mocked result" + + @patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation") + def test_propagates_errors_from_async_fn( + self, mock_async_fn: AsyncMock + ) -> None: + """Errors from the underlying async function should propagate.""" + mock_async_fn.side_effect = ConnectionError("remote agent down") + + with pytest.raises(ConnectionError, match="remote agent down"): + execute_a2a_delegation(**_DELEGATION_KWARGS) + + @patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation") + def test_propagates_errors_inside_running_event_loop( + self, mock_async_fn: AsyncMock + ) -> None: + """Errors should propagate even when called from a running loop.""" + mock_async_fn.side_effect = ConnectionError("remote agent down") + + error_holder: list[Exception] = [] + + async def _call_sync_from_async() -> None: + try: + execute_a2a_delegation(**_DELEGATION_KWARGS) + except Exception as exc: + error_holder.append(exc) + + asyncio.run(_call_sync_from_async()) + + assert len(error_holder) == 1 + assert isinstance(error_holder[0], ConnectionError) + assert "remote agent down" in str(error_holder[0])