mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-11 21:42:36 +00:00
fix(a2a): support Jupyter environments with running event loops
This commit is contained in:
@@ -4,6 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import MutableMapping
|
||||
import concurrent.futures
|
||||
from functools import lru_cache
|
||||
import ssl
|
||||
import time
|
||||
@@ -138,14 +139,17 @@ def fetch_agent_card(
|
||||
ttl_hash = int(time.time() // cache_ttl)
|
||||
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
coro = afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||
try:
|
||||
return loop.run_until_complete(
|
||||
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||
)
|
||||
finally:
|
||||
loop.close()
|
||||
asyncio.get_running_loop()
|
||||
has_running_loop = True
|
||||
except RuntimeError:
|
||||
has_running_loop = False
|
||||
|
||||
if has_running_loop:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
||||
return pool.submit(asyncio.run, coro).result()
|
||||
return asyncio.run(coro)
|
||||
|
||||
|
||||
async def afetch_agent_card(
|
||||
@@ -203,14 +207,17 @@ def _fetch_agent_card_cached(
|
||||
"""Cached sync version of fetch_agent_card."""
|
||||
auth = _auth_store.get(auth_hash)
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
coro = _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||
try:
|
||||
return loop.run_until_complete(
|
||||
_afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||
)
|
||||
finally:
|
||||
loop.close()
|
||||
asyncio.get_running_loop()
|
||||
has_running_loop = True
|
||||
except RuntimeError:
|
||||
has_running_loop = False
|
||||
|
||||
if has_running_loop:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
||||
return pool.submit(asyncio.run, coro).result()
|
||||
return asyncio.run(coro)
|
||||
|
||||
|
||||
@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator]
|
||||
|
||||
@@ -5,6 +5,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import base64
|
||||
from collections.abc import AsyncIterator, Callable, MutableMapping
|
||||
import concurrent.futures
|
||||
from contextlib import asynccontextmanager
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Final, Literal
|
||||
@@ -194,56 +195,43 @@ def execute_a2a_delegation(
|
||||
|
||||
Returns:
|
||||
TaskStateResult with status, result/error, history, and agent_card.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If called from an async context with a running event loop.
|
||||
"""
|
||||
coro = aexecute_a2a_delegation(
|
||||
endpoint=endpoint,
|
||||
auth=auth,
|
||||
timeout=timeout,
|
||||
task_description=task_description,
|
||||
context=context,
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
reference_task_ids=reference_task_ids,
|
||||
metadata=metadata,
|
||||
extensions=extensions,
|
||||
conversation_history=conversation_history,
|
||||
agent_id=agent_id,
|
||||
agent_role=agent_role,
|
||||
agent_branch=agent_branch,
|
||||
response_model=response_model,
|
||||
turn_number=turn_number,
|
||||
updates=updates,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
skill_id=skill_id,
|
||||
client_extensions=client_extensions,
|
||||
transport=transport,
|
||||
accepted_output_modes=accepted_output_modes,
|
||||
input_files=input_files,
|
||||
)
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
raise RuntimeError(
|
||||
"execute_a2a_delegation() cannot be called from an async context. "
|
||||
"Use 'await aexecute_a2a_delegation()' instead."
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "no running event loop" not in str(e).lower():
|
||||
raise
|
||||
has_running_loop = True
|
||||
except RuntimeError:
|
||||
has_running_loop = False
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
return loop.run_until_complete(
|
||||
aexecute_a2a_delegation(
|
||||
endpoint=endpoint,
|
||||
auth=auth,
|
||||
timeout=timeout,
|
||||
task_description=task_description,
|
||||
context=context,
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
reference_task_ids=reference_task_ids,
|
||||
metadata=metadata,
|
||||
extensions=extensions,
|
||||
conversation_history=conversation_history,
|
||||
agent_id=agent_id,
|
||||
agent_role=agent_role,
|
||||
agent_branch=agent_branch,
|
||||
response_model=response_model,
|
||||
turn_number=turn_number,
|
||||
updates=updates,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
skill_id=skill_id,
|
||||
client_extensions=client_extensions,
|
||||
transport=transport,
|
||||
accepted_output_modes=accepted_output_modes,
|
||||
input_files=input_files,
|
||||
)
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
finally:
|
||||
loop.close()
|
||||
if has_running_loop:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
|
||||
return pool.submit(asyncio.run, coro).result()
|
||||
return asyncio.run(coro)
|
||||
|
||||
|
||||
async def aexecute_a2a_delegation(
|
||||
|
||||
Reference in New Issue
Block a user