Compare commits

...

1 Commits

Author SHA1 Message Date
Devin AI
c2b151bd30 fix: handle running event loop in A2A sync wrappers (#4671)
execute_a2a_delegation() and fetch_agent_card() now use a ThreadPoolExecutor
to run async code in a separate thread when an event loop is already running
(e.g. Jupyter notebooks), instead of raising RuntimeError.

Co-Authored-By: João <joao@crewai.com>
2026-03-02 10:57:07 +00:00
4 changed files with 296 additions and 48 deletions

View File

@@ -138,12 +138,24 @@ def fetch_agent_card(
ttl_hash = int(time.time() // cache_ttl)
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
coro = afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
try:
asyncio.get_running_loop()
# Already inside an event loop (e.g. Jupyter notebook) - run the
# coroutine in a separate thread with its own event loop.
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, coro)
return future.result()
except RuntimeError:
pass
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
)
return loop.run_until_complete(coro)
finally:
loop.close()

View File

@@ -158,13 +158,10 @@ def execute_a2a_delegation(
) -> TaskStateResult:
"""Execute a task delegation to a remote A2A agent synchronously.
WARNING: This function blocks the entire thread by creating and running a new
event loop. Prefer using 'await aexecute_a2a_delegation()' in async contexts
for better performance and resource efficiency.
This is a synchronous wrapper around aexecute_a2a_delegation that creates a
new event loop to run the async implementation. It is provided for compatibility
with synchronous code paths only.
This is a synchronous wrapper around aexecute_a2a_delegation. When no event
loop is running it creates a new one; when called from within an existing
event loop (e.g. Jupyter notebooks) it transparently runs the coroutine in
a separate thread to avoid "cannot run nested event loop" errors.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
@@ -194,51 +191,51 @@ def execute_a2a_delegation(
Returns:
TaskStateResult with status, result/error, history, and agent_card.
Raises:
RuntimeError: If called from an async context with a running event loop.
"""
coro = aexecute_a2a_delegation(
endpoint=endpoint,
auth=auth,
timeout=timeout,
task_description=task_description,
context=context,
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=extensions,
conversation_history=conversation_history,
agent_id=agent_id,
agent_role=agent_role,
agent_branch=agent_branch,
response_model=response_model,
turn_number=turn_number,
updates=updates,
from_task=from_task,
from_agent=from_agent,
skill_id=skill_id,
client_extensions=client_extensions,
transport=transport,
accepted_output_modes=accepted_output_modes,
input_files=input_files,
)
try:
asyncio.get_running_loop()
raise RuntimeError(
"execute_a2a_delegation() cannot be called from an async context. "
"Use 'await aexecute_a2a_delegation()' instead."
)
except RuntimeError as e:
if "no running event loop" not in str(e).lower():
raise
# Already inside an event loop (e.g. Jupyter notebook) - run the
# coroutine in a separate thread with its own event loop so we don't
# block or conflict with the running one.
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, coro)
return future.result()
except RuntimeError:
pass
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
aexecute_a2a_delegation(
endpoint=endpoint,
auth=auth,
timeout=timeout,
task_description=task_description,
context=context,
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=extensions,
conversation_history=conversation_history,
agent_id=agent_id,
agent_role=agent_role,
agent_branch=agent_branch,
response_model=response_model,
turn_number=turn_number,
updates=updates,
from_task=from_task,
from_agent=from_agent,
skill_id=skill_id,
client_extensions=client_extensions,
transport=transport,
accepted_output_modes=accepted_output_modes,
input_files=input_files,
)
)
return loop.run_until_complete(coro)
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())

View File

@@ -0,0 +1,110 @@
"""Tests for fetch_agent_card sync wrapper when a running event loop exists.
Covers the same class of issue as #4671 — ``fetch_agent_card()`` must work
even when called from within an already-running event loop (e.g. Jupyter).
"""
from __future__ import annotations
import asyncio
from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from a2a.types import AgentCapabilities, AgentCard
from crewai.a2a.utils.agent_card import fetch_agent_card
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_agent_card() -> AgentCard:
"""Return a minimal ``AgentCard`` for mocking."""
return AgentCard(
name="Test Agent",
description="A test agent",
url="http://localhost:9999",
version="1.0.0",
capabilities=AgentCapabilities(streaming=False),
default_input_modes=["text"],
default_output_modes=["text"],
skills=[],
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestFetchAgentCardEventLoop:
"""Verify fetch_agent_card works with and without a running loop."""
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
def test_works_without_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Normal case: no running event loop, should succeed directly."""
expected = _mock_agent_card()
mock_async_fn.return_value = expected
result = fetch_agent_card(
endpoint="http://localhost:9999",
use_cache=False,
)
assert result.name == "Test Agent"
mock_async_fn.assert_called_once()
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
def test_works_inside_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Regression test: must not raise when a loop is already running."""
expected = _mock_agent_card()
mock_async_fn.return_value = expected
result_holder: list[Any] = []
error_holder: list[Exception] = []
async def _call_sync_from_async() -> None:
try:
res = fetch_agent_card(
endpoint="http://localhost:9999",
use_cache=False,
)
result_holder.append(res)
except Exception as exc:
error_holder.append(exc)
asyncio.run(_call_sync_from_async())
assert not error_holder, f"Unexpected error: {error_holder[0]}"
assert len(result_holder) == 1
assert result_holder[0].name == "Test Agent"
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
def test_propagates_errors_inside_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Errors should propagate even when called from a running loop."""
mock_async_fn.side_effect = ConnectionError("cannot reach agent")
error_holder: list[Exception] = []
async def _call_sync_from_async() -> None:
try:
fetch_agent_card(
endpoint="http://localhost:9999",
use_cache=False,
)
except Exception as exc:
error_holder.append(exc)
asyncio.run(_call_sync_from_async())
assert len(error_holder) == 1
assert isinstance(error_holder[0], ConnectionError)
assert "cannot reach agent" in str(error_holder[0])

View File

@@ -0,0 +1,129 @@
"""Tests for A2A delegation sync wrappers when a running event loop exists.
Covers the fix for https://github.com/crewAIInc/crewAI/issues/4671 where
``execute_a2a_delegation()`` raised ``RuntimeError`` when called from an
environment that already has a running event loop (e.g. Jupyter notebooks).
"""
from __future__ import annotations
import asyncio
import threading
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.utils.delegation import execute_a2a_delegation
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _minimal_task_state_result(**overrides: Any) -> TaskStateResult:
"""Return a minimal ``TaskStateResult`` dict for mocking."""
base: TaskStateResult = {
"status": "completed",
"result": "mocked result",
"error": None,
"history": [],
"agent_card": None,
}
base.update(overrides) # type: ignore[typeddict-item]
return base
_DELEGATION_KWARGS: dict[str, Any] = dict(
endpoint="http://localhost:9999/.well-known/agent-card.json",
auth=None,
timeout=30,
task_description="test task",
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestExecuteA2ADelegationEventLoop:
"""Verify execute_a2a_delegation works with and without a running loop."""
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
def test_works_without_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Normal case: no running event loop, should succeed directly."""
expected = _minimal_task_state_result()
mock_async_fn.return_value = expected
result = execute_a2a_delegation(**_DELEGATION_KWARGS)
assert result["status"] == "completed"
assert result["result"] == "mocked result"
mock_async_fn.assert_called_once()
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
def test_works_inside_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Regression test for #4671: must not raise when a loop is running.
Simulates the Jupyter notebook environment by calling the sync wrapper
from within an already-running event loop.
"""
expected = _minimal_task_state_result()
mock_async_fn.return_value = expected
result_holder: list[Any] = []
error_holder: list[Exception] = []
async def _call_sync_from_async() -> None:
"""Call the sync function from within a running event loop."""
try:
# This must NOT raise RuntimeError anymore
res = execute_a2a_delegation(**_DELEGATION_KWARGS)
result_holder.append(res)
except Exception as exc:
error_holder.append(exc)
# Run inside an event loop, simulating a Jupyter notebook
asyncio.run(_call_sync_from_async())
assert not error_holder, f"Unexpected error: {error_holder[0]}"
assert len(result_holder) == 1
assert result_holder[0]["status"] == "completed"
assert result_holder[0]["result"] == "mocked result"
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
def test_propagates_errors_from_async_fn(
self, mock_async_fn: AsyncMock
) -> None:
"""Errors from the underlying async function should propagate."""
mock_async_fn.side_effect = ConnectionError("remote agent down")
with pytest.raises(ConnectionError, match="remote agent down"):
execute_a2a_delegation(**_DELEGATION_KWARGS)
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
def test_propagates_errors_inside_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Errors should propagate even when called from a running loop."""
mock_async_fn.side_effect = ConnectionError("remote agent down")
error_holder: list[Exception] = []
async def _call_sync_from_async() -> None:
try:
execute_a2a_delegation(**_DELEGATION_KWARGS)
except Exception as exc:
error_holder.append(exc)
asyncio.run(_call_sync_from_async())
assert len(error_holder) == 1
assert isinstance(error_holder[0], ConnectionError)
assert "remote agent down" in str(error_holder[0])