mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-03-02 17:58:14 +00:00
Compare commits
1 Commits
cursor/har
...
devin/1772
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c2b151bd30 |
@@ -138,12 +138,24 @@ def fetch_agent_card(
|
||||
ttl_hash = int(time.time() // cache_ttl)
|
||||
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
|
||||
|
||||
coro = afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
# Already inside an event loop (e.g. Jupyter notebook) - run the
|
||||
# coroutine in a separate thread with its own event loop.
|
||||
import concurrent.futures
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(asyncio.run, coro)
|
||||
return future.result()
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
return loop.run_until_complete(
|
||||
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
|
||||
)
|
||||
return loop.run_until_complete(coro)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
|
||||
@@ -158,13 +158,10 @@ def execute_a2a_delegation(
|
||||
) -> TaskStateResult:
|
||||
"""Execute a task delegation to a remote A2A agent synchronously.
|
||||
|
||||
WARNING: This function blocks the entire thread by creating and running a new
|
||||
event loop. Prefer using 'await aexecute_a2a_delegation()' in async contexts
|
||||
for better performance and resource efficiency.
|
||||
|
||||
This is a synchronous wrapper around aexecute_a2a_delegation that creates a
|
||||
new event loop to run the async implementation. It is provided for compatibility
|
||||
with synchronous code paths only.
|
||||
This is a synchronous wrapper around aexecute_a2a_delegation. When no event
|
||||
loop is running it creates a new one; when called from within an existing
|
||||
event loop (e.g. Jupyter notebooks) it transparently runs the coroutine in
|
||||
a separate thread to avoid "cannot run nested event loop" errors.
|
||||
|
||||
Args:
|
||||
endpoint: A2A agent endpoint URL (AgentCard URL).
|
||||
@@ -194,51 +191,51 @@ def execute_a2a_delegation(
|
||||
|
||||
Returns:
|
||||
TaskStateResult with status, result/error, history, and agent_card.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If called from an async context with a running event loop.
|
||||
"""
|
||||
coro = aexecute_a2a_delegation(
|
||||
endpoint=endpoint,
|
||||
auth=auth,
|
||||
timeout=timeout,
|
||||
task_description=task_description,
|
||||
context=context,
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
reference_task_ids=reference_task_ids,
|
||||
metadata=metadata,
|
||||
extensions=extensions,
|
||||
conversation_history=conversation_history,
|
||||
agent_id=agent_id,
|
||||
agent_role=agent_role,
|
||||
agent_branch=agent_branch,
|
||||
response_model=response_model,
|
||||
turn_number=turn_number,
|
||||
updates=updates,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
skill_id=skill_id,
|
||||
client_extensions=client_extensions,
|
||||
transport=transport,
|
||||
accepted_output_modes=accepted_output_modes,
|
||||
input_files=input_files,
|
||||
)
|
||||
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
raise RuntimeError(
|
||||
"execute_a2a_delegation() cannot be called from an async context. "
|
||||
"Use 'await aexecute_a2a_delegation()' instead."
|
||||
)
|
||||
except RuntimeError as e:
|
||||
if "no running event loop" not in str(e).lower():
|
||||
raise
|
||||
# Already inside an event loop (e.g. Jupyter notebook) - run the
|
||||
# coroutine in a separate thread with its own event loop so we don't
|
||||
# block or conflict with the running one.
|
||||
import concurrent.futures
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
future = executor.submit(asyncio.run, coro)
|
||||
return future.result()
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
return loop.run_until_complete(
|
||||
aexecute_a2a_delegation(
|
||||
endpoint=endpoint,
|
||||
auth=auth,
|
||||
timeout=timeout,
|
||||
task_description=task_description,
|
||||
context=context,
|
||||
context_id=context_id,
|
||||
task_id=task_id,
|
||||
reference_task_ids=reference_task_ids,
|
||||
metadata=metadata,
|
||||
extensions=extensions,
|
||||
conversation_history=conversation_history,
|
||||
agent_id=agent_id,
|
||||
agent_role=agent_role,
|
||||
agent_branch=agent_branch,
|
||||
response_model=response_model,
|
||||
turn_number=turn_number,
|
||||
updates=updates,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
skill_id=skill_id,
|
||||
client_extensions=client_extensions,
|
||||
transport=transport,
|
||||
accepted_output_modes=accepted_output_modes,
|
||||
input_files=input_files,
|
||||
)
|
||||
)
|
||||
return loop.run_until_complete(coro)
|
||||
finally:
|
||||
try:
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
|
||||
110
lib/crewai/tests/a2a/utils/test_agent_card_event_loop.py
Normal file
110
lib/crewai/tests/a2a/utils/test_agent_card_event_loop.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""Tests for fetch_agent_card sync wrapper when a running event loop exists.
|
||||
|
||||
Covers the same class of issue as #4671 — ``fetch_agent_card()`` must work
|
||||
even when called from within an already-running event loop (e.g. Jupyter).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
from a2a.types import AgentCapabilities, AgentCard
|
||||
|
||||
from crewai.a2a.utils.agent_card import fetch_agent_card
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _mock_agent_card() -> AgentCard:
|
||||
"""Return a minimal ``AgentCard`` for mocking."""
|
||||
return AgentCard(
|
||||
name="Test Agent",
|
||||
description="A test agent",
|
||||
url="http://localhost:9999",
|
||||
version="1.0.0",
|
||||
capabilities=AgentCapabilities(streaming=False),
|
||||
default_input_modes=["text"],
|
||||
default_output_modes=["text"],
|
||||
skills=[],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFetchAgentCardEventLoop:
|
||||
"""Verify fetch_agent_card works with and without a running loop."""
|
||||
|
||||
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
|
||||
def test_works_without_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Normal case: no running event loop, should succeed directly."""
|
||||
expected = _mock_agent_card()
|
||||
mock_async_fn.return_value = expected
|
||||
|
||||
result = fetch_agent_card(
|
||||
endpoint="http://localhost:9999",
|
||||
use_cache=False,
|
||||
)
|
||||
|
||||
assert result.name == "Test Agent"
|
||||
mock_async_fn.assert_called_once()
|
||||
|
||||
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
|
||||
def test_works_inside_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Regression test: must not raise when a loop is already running."""
|
||||
expected = _mock_agent_card()
|
||||
mock_async_fn.return_value = expected
|
||||
|
||||
result_holder: list[Any] = []
|
||||
error_holder: list[Exception] = []
|
||||
|
||||
async def _call_sync_from_async() -> None:
|
||||
try:
|
||||
res = fetch_agent_card(
|
||||
endpoint="http://localhost:9999",
|
||||
use_cache=False,
|
||||
)
|
||||
result_holder.append(res)
|
||||
except Exception as exc:
|
||||
error_holder.append(exc)
|
||||
|
||||
asyncio.run(_call_sync_from_async())
|
||||
|
||||
assert not error_holder, f"Unexpected error: {error_holder[0]}"
|
||||
assert len(result_holder) == 1
|
||||
assert result_holder[0].name == "Test Agent"
|
||||
|
||||
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
|
||||
def test_propagates_errors_inside_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Errors should propagate even when called from a running loop."""
|
||||
mock_async_fn.side_effect = ConnectionError("cannot reach agent")
|
||||
|
||||
error_holder: list[Exception] = []
|
||||
|
||||
async def _call_sync_from_async() -> None:
|
||||
try:
|
||||
fetch_agent_card(
|
||||
endpoint="http://localhost:9999",
|
||||
use_cache=False,
|
||||
)
|
||||
except Exception as exc:
|
||||
error_holder.append(exc)
|
||||
|
||||
asyncio.run(_call_sync_from_async())
|
||||
|
||||
assert len(error_holder) == 1
|
||||
assert isinstance(error_holder[0], ConnectionError)
|
||||
assert "cannot reach agent" in str(error_holder[0])
|
||||
129
lib/crewai/tests/a2a/utils/test_delegation_event_loop.py
Normal file
129
lib/crewai/tests/a2a/utils/test_delegation_event_loop.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""Tests for A2A delegation sync wrappers when a running event loop exists.
|
||||
|
||||
Covers the fix for https://github.com/crewAIInc/crewAI/issues/4671 where
|
||||
``execute_a2a_delegation()`` raised ``RuntimeError`` when called from an
|
||||
environment that already has a running event loop (e.g. Jupyter notebooks).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import threading
|
||||
from typing import Any
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.a2a.task_helpers import TaskStateResult
|
||||
from crewai.a2a.utils.delegation import execute_a2a_delegation
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _minimal_task_state_result(**overrides: Any) -> TaskStateResult:
|
||||
"""Return a minimal ``TaskStateResult`` dict for mocking."""
|
||||
base: TaskStateResult = {
|
||||
"status": "completed",
|
||||
"result": "mocked result",
|
||||
"error": None,
|
||||
"history": [],
|
||||
"agent_card": None,
|
||||
}
|
||||
base.update(overrides) # type: ignore[typeddict-item]
|
||||
return base
|
||||
|
||||
|
||||
_DELEGATION_KWARGS: dict[str, Any] = dict(
|
||||
endpoint="http://localhost:9999/.well-known/agent-card.json",
|
||||
auth=None,
|
||||
timeout=30,
|
||||
task_description="test task",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExecuteA2ADelegationEventLoop:
|
||||
"""Verify execute_a2a_delegation works with and without a running loop."""
|
||||
|
||||
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
|
||||
def test_works_without_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Normal case: no running event loop, should succeed directly."""
|
||||
expected = _minimal_task_state_result()
|
||||
mock_async_fn.return_value = expected
|
||||
|
||||
result = execute_a2a_delegation(**_DELEGATION_KWARGS)
|
||||
|
||||
assert result["status"] == "completed"
|
||||
assert result["result"] == "mocked result"
|
||||
mock_async_fn.assert_called_once()
|
||||
|
||||
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
|
||||
def test_works_inside_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Regression test for #4671: must not raise when a loop is running.
|
||||
|
||||
Simulates the Jupyter notebook environment by calling the sync wrapper
|
||||
from within an already-running event loop.
|
||||
"""
|
||||
expected = _minimal_task_state_result()
|
||||
mock_async_fn.return_value = expected
|
||||
|
||||
result_holder: list[Any] = []
|
||||
error_holder: list[Exception] = []
|
||||
|
||||
async def _call_sync_from_async() -> None:
|
||||
"""Call the sync function from within a running event loop."""
|
||||
try:
|
||||
# This must NOT raise RuntimeError anymore
|
||||
res = execute_a2a_delegation(**_DELEGATION_KWARGS)
|
||||
result_holder.append(res)
|
||||
except Exception as exc:
|
||||
error_holder.append(exc)
|
||||
|
||||
# Run inside an event loop, simulating a Jupyter notebook
|
||||
asyncio.run(_call_sync_from_async())
|
||||
|
||||
assert not error_holder, f"Unexpected error: {error_holder[0]}"
|
||||
assert len(result_holder) == 1
|
||||
assert result_holder[0]["status"] == "completed"
|
||||
assert result_holder[0]["result"] == "mocked result"
|
||||
|
||||
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
|
||||
def test_propagates_errors_from_async_fn(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Errors from the underlying async function should propagate."""
|
||||
mock_async_fn.side_effect = ConnectionError("remote agent down")
|
||||
|
||||
with pytest.raises(ConnectionError, match="remote agent down"):
|
||||
execute_a2a_delegation(**_DELEGATION_KWARGS)
|
||||
|
||||
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
|
||||
def test_propagates_errors_inside_running_event_loop(
|
||||
self, mock_async_fn: AsyncMock
|
||||
) -> None:
|
||||
"""Errors should propagate even when called from a running loop."""
|
||||
mock_async_fn.side_effect = ConnectionError("remote agent down")
|
||||
|
||||
error_holder: list[Exception] = []
|
||||
|
||||
async def _call_sync_from_async() -> None:
|
||||
try:
|
||||
execute_a2a_delegation(**_DELEGATION_KWARGS)
|
||||
except Exception as exc:
|
||||
error_holder.append(exc)
|
||||
|
||||
asyncio.run(_call_sync_from_async())
|
||||
|
||||
assert len(error_holder) == 1
|
||||
assert isinstance(error_holder[0], ConnectionError)
|
||||
assert "remote agent down" in str(error_holder[0])
|
||||
Reference in New Issue
Block a user