Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
c2b151bd30 fix: handle running event loop in A2A sync wrappers (#4671)
execute_a2a_delegation() and fetch_agent_card() now use a ThreadPoolExecutor
to run async code in a separate thread when an event loop is already running
(e.g. Jupyter notebooks), instead of raising RuntimeError.

Co-Authored-By: João <joao@crewai.com>
2026-03-02 10:57:07 +00:00
6 changed files with 296 additions and 314 deletions

View File

@@ -138,12 +138,24 @@ def fetch_agent_card(
ttl_hash = int(time.time() // cache_ttl)
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
coro = afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
try:
asyncio.get_running_loop()
# Already inside an event loop (e.g. Jupyter notebook) - run the
# coroutine in a separate thread with its own event loop.
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, coro)
return future.result()
except RuntimeError:
pass
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
)
return loop.run_until_complete(coro)
finally:
loop.close()

View File

@@ -158,13 +158,10 @@ def execute_a2a_delegation(
) -> TaskStateResult:
"""Execute a task delegation to a remote A2A agent synchronously.
WARNING: This function blocks the entire thread by creating and running a new
event loop. Prefer using 'await aexecute_a2a_delegation()' in async contexts
for better performance and resource efficiency.
This is a synchronous wrapper around aexecute_a2a_delegation that creates a
new event loop to run the async implementation. It is provided for compatibility
with synchronous code paths only.
This is a synchronous wrapper around aexecute_a2a_delegation. When no event
loop is running it creates a new one; when called from within an existing
event loop (e.g. Jupyter notebooks) it transparently runs the coroutine in
a separate thread to avoid "cannot run nested event loop" errors.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
@@ -194,51 +191,51 @@ def execute_a2a_delegation(
Returns:
TaskStateResult with status, result/error, history, and agent_card.
Raises:
RuntimeError: If called from an async context with a running event loop.
"""
coro = aexecute_a2a_delegation(
endpoint=endpoint,
auth=auth,
timeout=timeout,
task_description=task_description,
context=context,
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=extensions,
conversation_history=conversation_history,
agent_id=agent_id,
agent_role=agent_role,
agent_branch=agent_branch,
response_model=response_model,
turn_number=turn_number,
updates=updates,
from_task=from_task,
from_agent=from_agent,
skill_id=skill_id,
client_extensions=client_extensions,
transport=transport,
accepted_output_modes=accepted_output_modes,
input_files=input_files,
)
try:
asyncio.get_running_loop()
raise RuntimeError(
"execute_a2a_delegation() cannot be called from an async context. "
"Use 'await aexecute_a2a_delegation()' instead."
)
except RuntimeError as e:
if "no running event loop" not in str(e).lower():
raise
# Already inside an event loop (e.g. Jupyter notebook) - run the
# coroutine in a separate thread with its own event loop so we don't
# block or conflict with the running one.
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, coro)
return future.result()
except RuntimeError:
pass
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
aexecute_a2a_delegation(
endpoint=endpoint,
auth=auth,
timeout=timeout,
task_description=task_description,
context=context,
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=extensions,
conversation_history=conversation_history,
agent_id=agent_id,
agent_role=agent_role,
agent_branch=agent_branch,
response_model=response_model,
turn_number=turn_number,
updates=updates,
from_task=from_task,
from_agent=from_agent,
skill_id=skill_id,
client_extensions=client_extensions,
transport=transport,
accepted_output_modes=accepted_output_modes,
input_files=input_files,
)
)
return loop.run_until_complete(coro)
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())

View File

@@ -214,13 +214,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
Dictionary with agent output.
"""
# Reset execution state for a fresh execution.
# When the same executor instance is reused across sequential tasks,
# stale messages and iteration counts must be cleared to prevent
# context pollution and premature max-iteration exits.
self.messages = []
self.iterations = 0
self._setup_messages(inputs)
self._inject_multimodal_files(inputs)
@@ -1122,13 +1115,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
Dictionary with agent output.
"""
# Reset execution state for a fresh execution.
# When the same executor instance is reused across sequential tasks,
# stale messages and iteration counts must be cleared to prevent
# context pollution and premature max-iteration exits.
self.messages = []
self.iterations = 0
self._setup_messages(inputs)
await self._ainject_multimodal_files(inputs)

View File

@@ -0,0 +1,110 @@
"""Tests for fetch_agent_card sync wrapper when a running event loop exists.
Covers the same class of issue as #4671 — ``fetch_agent_card()`` must work
even when called from within an already-running event loop (e.g. Jupyter).
"""
from __future__ import annotations
import asyncio
from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from a2a.types import AgentCapabilities, AgentCard
from crewai.a2a.utils.agent_card import fetch_agent_card
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_agent_card() -> AgentCard:
"""Return a minimal ``AgentCard`` for mocking."""
return AgentCard(
name="Test Agent",
description="A test agent",
url="http://localhost:9999",
version="1.0.0",
capabilities=AgentCapabilities(streaming=False),
default_input_modes=["text"],
default_output_modes=["text"],
skills=[],
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestFetchAgentCardEventLoop:
"""Verify fetch_agent_card works with and without a running loop."""
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
def test_works_without_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Normal case: no running event loop, should succeed directly."""
expected = _mock_agent_card()
mock_async_fn.return_value = expected
result = fetch_agent_card(
endpoint="http://localhost:9999",
use_cache=False,
)
assert result.name == "Test Agent"
mock_async_fn.assert_called_once()
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
def test_works_inside_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Regression test: must not raise when a loop is already running."""
expected = _mock_agent_card()
mock_async_fn.return_value = expected
result_holder: list[Any] = []
error_holder: list[Exception] = []
async def _call_sync_from_async() -> None:
try:
res = fetch_agent_card(
endpoint="http://localhost:9999",
use_cache=False,
)
result_holder.append(res)
except Exception as exc:
error_holder.append(exc)
asyncio.run(_call_sync_from_async())
assert not error_holder, f"Unexpected error: {error_holder[0]}"
assert len(result_holder) == 1
assert result_holder[0].name == "Test Agent"
@patch("crewai.a2a.utils.agent_card.afetch_agent_card")
def test_propagates_errors_inside_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Errors should propagate even when called from a running loop."""
mock_async_fn.side_effect = ConnectionError("cannot reach agent")
error_holder: list[Exception] = []
async def _call_sync_from_async() -> None:
try:
fetch_agent_card(
endpoint="http://localhost:9999",
use_cache=False,
)
except Exception as exc:
error_holder.append(exc)
asyncio.run(_call_sync_from_async())
assert len(error_holder) == 1
assert isinstance(error_holder[0], ConnectionError)
assert "cannot reach agent" in str(error_holder[0])

View File

@@ -0,0 +1,129 @@
"""Tests for A2A delegation sync wrappers when a running event loop exists.
Covers the fix for https://github.com/crewAIInc/crewAI/issues/4671 where
``execute_a2a_delegation()`` raised ``RuntimeError`` when called from an
environment that already has a running event loop (e.g. Jupyter notebooks).
"""
from __future__ import annotations
import asyncio
import threading
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.utils.delegation import execute_a2a_delegation
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _minimal_task_state_result(**overrides: Any) -> TaskStateResult:
"""Return a minimal ``TaskStateResult`` dict for mocking."""
base: TaskStateResult = {
"status": "completed",
"result": "mocked result",
"error": None,
"history": [],
"agent_card": None,
}
base.update(overrides) # type: ignore[typeddict-item]
return base
_DELEGATION_KWARGS: dict[str, Any] = dict(
endpoint="http://localhost:9999/.well-known/agent-card.json",
auth=None,
timeout=30,
task_description="test task",
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestExecuteA2ADelegationEventLoop:
"""Verify execute_a2a_delegation works with and without a running loop."""
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
def test_works_without_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Normal case: no running event loop, should succeed directly."""
expected = _minimal_task_state_result()
mock_async_fn.return_value = expected
result = execute_a2a_delegation(**_DELEGATION_KWARGS)
assert result["status"] == "completed"
assert result["result"] == "mocked result"
mock_async_fn.assert_called_once()
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
def test_works_inside_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Regression test for #4671: must not raise when a loop is running.
Simulates the Jupyter notebook environment by calling the sync wrapper
from within an already-running event loop.
"""
expected = _minimal_task_state_result()
mock_async_fn.return_value = expected
result_holder: list[Any] = []
error_holder: list[Exception] = []
async def _call_sync_from_async() -> None:
"""Call the sync function from within a running event loop."""
try:
# This must NOT raise RuntimeError anymore
res = execute_a2a_delegation(**_DELEGATION_KWARGS)
result_holder.append(res)
except Exception as exc:
error_holder.append(exc)
# Run inside an event loop, simulating a Jupyter notebook
asyncio.run(_call_sync_from_async())
assert not error_holder, f"Unexpected error: {error_holder[0]}"
assert len(result_holder) == 1
assert result_holder[0]["status"] == "completed"
assert result_holder[0]["result"] == "mocked result"
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
def test_propagates_errors_from_async_fn(
self, mock_async_fn: AsyncMock
) -> None:
"""Errors from the underlying async function should propagate."""
mock_async_fn.side_effect = ConnectionError("remote agent down")
with pytest.raises(ConnectionError, match="remote agent down"):
execute_a2a_delegation(**_DELEGATION_KWARGS)
@patch("crewai.a2a.utils.delegation.aexecute_a2a_delegation")
def test_propagates_errors_inside_running_event_loop(
self, mock_async_fn: AsyncMock
) -> None:
"""Errors should propagate even when called from a running loop."""
mock_async_fn.side_effect = ConnectionError("remote agent down")
error_holder: list[Exception] = []
async def _call_sync_from_async() -> None:
try:
execute_a2a_delegation(**_DELEGATION_KWARGS)
except Exception as exc:
error_holder.append(exc)
asyncio.run(_call_sync_from_async())
assert len(error_holder) == 1
assert isinstance(error_holder[0], ConnectionError)
assert "remote agent down" in str(error_holder[0])

View File

@@ -1,252 +0,0 @@
"""Tests for CrewAgentExecutor state reset between task executions.
Verifies that messages and iterations are properly cleared when the same
executor instance is reused across sequential tasks, preventing context
pollution and premature max-iteration exits.
Related issues: #4319, #4389, #4661
"""
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock, Mock, patch
import pytest
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.agents.parser import AgentFinish
@pytest.fixture
def _make_executor() -> callable:
"""Factory fixture that builds a CrewAgentExecutor with mocked deps."""
def _factory(**overrides: Any) -> CrewAgentExecutor:
llm = Mock()
llm.supports_stop_words.return_value = True
llm.stop = []
llm.supports_function_calling.return_value = False
task = Mock()
task.id = "task-1"
task.description = "Test task"
task.human_input = False
task.response_model = None
task.output_pydantic = None
task.output_json = None
crew = MagicMock()
crew.id = "crew-1"
crew._memory = None
crew._train = False
crew.verbose = False
agent = MagicMock()
agent.id = "agent-1"
agent.role = "Tester"
agent.verbose = False
agent.key = "tester-key"
agent.security_config = MagicMock()
agent.security_config.fingerprint = "fp-123"
tools_handler = Mock()
tools_handler.cache = None
defaults: dict[str, Any] = {
"llm": llm,
"task": task,
"crew": crew,
"agent": agent,
"prompt": {"system": "You are a helper.", "user": "Do {input}"},
"max_iter": 10,
"tools": [],
"tools_names": "",
"stop_words": ["Observation:"],
"tools_description": "",
"tools_handler": tools_handler,
}
defaults.update(overrides)
return CrewAgentExecutor(**defaults)
return _factory
class TestCrewAgentExecutorStateReset:
"""Ensure invoke() and ainvoke() reset execution state."""
# ------------------------------------------------------------------
# Synchronous invoke
# ------------------------------------------------------------------
@patch(
"crewai.agents.crew_agent_executor.get_all_files",
return_value=None,
)
@patch(
"crewai.agents.crew_agent_executor.get_llm_response",
return_value="Final Answer: done",
)
def test_invoke_resets_messages_between_calls(
self,
_mock_llm_response: Mock,
_mock_files: Mock,
_make_executor: callable,
) -> None:
"""Messages list should be fresh on every invoke() call."""
executor = _make_executor()
inputs: dict[str, str] = {"input": "task-one", "tool_names": "", "tools": ""}
# First invocation
executor.invoke(inputs)
# Messages should have been populated during the first run
msgs_after_first = list(executor.messages)
assert len(msgs_after_first) > 0
# Second invocation (simulating reuse for a different task)
executor.invoke(inputs)
# The messages list should NOT contain leftovers from the first run.
# It should start fresh with only messages from the second invocation.
# Specifically, there should be exactly one system and one user message
# from _setup_messages, plus any assistant messages from the loop.
system_msgs = [m for m in executor.messages if m.get("role") == "system"]
assert len(system_msgs) == 1, (
f"Expected exactly 1 system message after second invoke, "
f"got {len(system_msgs)}"
)
@patch(
"crewai.agents.crew_agent_executor.get_all_files",
return_value=None,
)
@patch(
"crewai.agents.crew_agent_executor.get_llm_response",
return_value="Final Answer: done",
)
def test_invoke_resets_iterations_between_calls(
self,
_mock_llm_response: Mock,
_mock_files: Mock,
_make_executor: callable,
) -> None:
"""Iterations counter should reset to 0 on every invoke() call."""
executor = _make_executor()
inputs: dict[str, str] = {"input": "task-one", "tool_names": "", "tools": ""}
executor.invoke(inputs)
assert executor.iterations > 0, "iterations should have incremented"
# Second invocation
executor.invoke(inputs)
# iterations should have been reset and only reflect the second run
# (for a single-pass answer it should be 1)
assert executor.iterations == 1, (
f"Expected iterations == 1 after reset, got {executor.iterations}"
)
# ------------------------------------------------------------------
# Asynchronous ainvoke
# ------------------------------------------------------------------
@pytest.mark.asyncio
@patch(
"crewai.agents.crew_agent_executor.aget_all_files",
return_value=None,
)
@patch(
"crewai.agents.crew_agent_executor.aget_llm_response",
return_value="Final Answer: done",
)
async def test_ainvoke_resets_messages_between_calls(
self,
_mock_llm_response: Mock,
_mock_files: Mock,
_make_executor: callable,
) -> None:
"""Messages list should be fresh on every ainvoke() call."""
executor = _make_executor()
inputs: dict[str, str] = {"input": "task-one", "tool_names": "", "tools": ""}
await executor.ainvoke(inputs)
msgs_after_first = list(executor.messages)
assert len(msgs_after_first) > 0
await executor.ainvoke(inputs)
system_msgs = [m for m in executor.messages if m.get("role") == "system"]
assert len(system_msgs) == 1, (
f"Expected exactly 1 system message after second ainvoke, "
f"got {len(system_msgs)}"
)
@pytest.mark.asyncio
@patch(
"crewai.agents.crew_agent_executor.aget_all_files",
return_value=None,
)
@patch(
"crewai.agents.crew_agent_executor.aget_llm_response",
return_value="Final Answer: done",
)
async def test_ainvoke_resets_iterations_between_calls(
self,
_mock_llm_response: Mock,
_mock_files: Mock,
_make_executor: callable,
) -> None:
"""Iterations counter should reset to 0 on every ainvoke() call."""
executor = _make_executor()
inputs: dict[str, str] = {"input": "task-one", "tool_names": "", "tools": ""}
await executor.ainvoke(inputs)
assert executor.iterations > 0
await executor.ainvoke(inputs)
assert executor.iterations == 1, (
f"Expected iterations == 1 after reset, got {executor.iterations}"
)
# ------------------------------------------------------------------
# Regression: multiple sequential tasks via the same executor
# ------------------------------------------------------------------
@patch(
"crewai.agents.crew_agent_executor.get_all_files",
return_value=None,
)
@patch(
"crewai.agents.crew_agent_executor.get_llm_response",
return_value="Final Answer: result",
)
def test_no_context_leak_across_three_sequential_invokes(
self,
_mock_llm_response: Mock,
_mock_files: Mock,
_make_executor: callable,
) -> None:
"""Simulates an agent reused across 3 sequential tasks.
After each invoke the message count should be consistent and not
grow unboundedly.
"""
executor = _make_executor()
inputs: dict[str, str] = {"input": "task", "tool_names": "", "tools": ""}
message_counts: list[int] = []
for _ in range(3):
executor.invoke(inputs)
message_counts.append(len(executor.messages))
# All three runs should produce the same number of messages
assert message_counts[0] == message_counts[1] == message_counts[2], (
f"Message counts diverged across sequential invokes: {message_counts}"
)