From 5d0811258fc12fc7828a73b9ccd5f03305fcacaf Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Tue, 3 Mar 2026 10:05:48 -0500 Subject: [PATCH] fix(a2a): support Jupyter environments with running event loops --- lib/crewai/src/crewai/a2a/utils/agent_card.py | 35 ++++---- lib/crewai/src/crewai/a2a/utils/delegation.py | 80 ++++++++----------- 2 files changed, 55 insertions(+), 60 deletions(-) diff --git a/lib/crewai/src/crewai/a2a/utils/agent_card.py b/lib/crewai/src/crewai/a2a/utils/agent_card.py index c548cd1e7..45819bebd 100644 --- a/lib/crewai/src/crewai/a2a/utils/agent_card.py +++ b/lib/crewai/src/crewai/a2a/utils/agent_card.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio from collections.abc import MutableMapping +import concurrent.futures from functools import lru_cache import ssl import time @@ -138,14 +139,17 @@ def fetch_agent_card( ttl_hash = int(time.time() // cache_ttl) return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + coro = afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) try: - return loop.run_until_complete( - afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout) - ) - finally: - loop.close() + asyncio.get_running_loop() + has_running_loop = True + except RuntimeError: + has_running_loop = False + + if has_running_loop: + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + return pool.submit(asyncio.run, coro).result() + return asyncio.run(coro) async def afetch_agent_card( @@ -203,14 +207,17 @@ def _fetch_agent_card_cached( """Cached sync version of fetch_agent_card.""" auth = _auth_store.get(auth_hash) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + coro = _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) try: - return loop.run_until_complete( - _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout) - ) - finally: - loop.close() + asyncio.get_running_loop() + has_running_loop = True + except RuntimeError: + has_running_loop = False + + if has_running_loop: + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + return pool.submit(asyncio.run, coro).result() + return asyncio.run(coro) @cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator] diff --git a/lib/crewai/src/crewai/a2a/utils/delegation.py b/lib/crewai/src/crewai/a2a/utils/delegation.py index cfcf51f36..3a6795c34 100644 --- a/lib/crewai/src/crewai/a2a/utils/delegation.py +++ b/lib/crewai/src/crewai/a2a/utils/delegation.py @@ -5,6 +5,7 @@ from __future__ import annotations import asyncio import base64 from collections.abc import AsyncIterator, Callable, MutableMapping +import concurrent.futures from contextlib import asynccontextmanager import logging from typing import TYPE_CHECKING, Any, Final, Literal @@ -194,56 +195,43 @@ def execute_a2a_delegation( Returns: TaskStateResult with status, result/error, history, and agent_card. - - Raises: - RuntimeError: If called from an async context with a running event loop. """ + coro = aexecute_a2a_delegation( + endpoint=endpoint, + auth=auth, + timeout=timeout, + task_description=task_description, + context=context, + context_id=context_id, + task_id=task_id, + reference_task_ids=reference_task_ids, + metadata=metadata, + extensions=extensions, + conversation_history=conversation_history, + agent_id=agent_id, + agent_role=agent_role, + agent_branch=agent_branch, + response_model=response_model, + turn_number=turn_number, + updates=updates, + from_task=from_task, + from_agent=from_agent, + skill_id=skill_id, + client_extensions=client_extensions, + transport=transport, + accepted_output_modes=accepted_output_modes, + input_files=input_files, + ) try: asyncio.get_running_loop() - raise RuntimeError( - "execute_a2a_delegation() cannot be called from an async context. " - "Use 'await aexecute_a2a_delegation()' instead." - ) - except RuntimeError as e: - if "no running event loop" not in str(e).lower(): - raise + has_running_loop = True + except RuntimeError: + has_running_loop = False - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete( - aexecute_a2a_delegation( - endpoint=endpoint, - auth=auth, - timeout=timeout, - task_description=task_description, - context=context, - context_id=context_id, - task_id=task_id, - reference_task_ids=reference_task_ids, - metadata=metadata, - extensions=extensions, - conversation_history=conversation_history, - agent_id=agent_id, - agent_role=agent_role, - agent_branch=agent_branch, - response_model=response_model, - turn_number=turn_number, - updates=updates, - from_task=from_task, - from_agent=from_agent, - skill_id=skill_id, - client_extensions=client_extensions, - transport=transport, - accepted_output_modes=accepted_output_modes, - input_files=input_files, - ) - ) - finally: - try: - loop.run_until_complete(loop.shutdown_asyncgens()) - finally: - loop.close() + if has_running_loop: + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: + return pool.submit(asyncio.run, coro).result() + return asyncio.run(coro) async def aexecute_a2a_delegation(