From fa3b1d5f64cf7642039b65e9aabcb6feb6a6751d Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Mon, 9 Mar 2026 14:50:36 -0700 Subject: [PATCH] implement dedicated background event loop for MCP operations - Introduced a shared event loop running in a background thread to manage MCP connections efficiently. - Updated the MCPNativeTool class to ensure all async operations are dispatched to this dedicated loop, preventing cross-loop cancel-scope issues. - Enhanced error handling during tool execution to ensure clean disconnection and reconnection processes. --- .../src/crewai/tools/mcp_native_tool.py | 111 ++++++++++++------ 1 file changed, 72 insertions(+), 39 deletions(-) diff --git a/lib/crewai/src/crewai/tools/mcp_native_tool.py b/lib/crewai/src/crewai/tools/mcp_native_tool.py index d14c26a5a..ffae56c2a 100644 --- a/lib/crewai/src/crewai/tools/mcp_native_tool.py +++ b/lib/crewai/src/crewai/tools/mcp_native_tool.py @@ -5,11 +5,17 @@ for better performance and connection management. """ import asyncio +import threading from typing import Any from crewai.tools import BaseTool +_mcp_loop_lock = threading.Lock() +_mcp_shared_loop: asyncio.AbstractEventLoop | None = None +_mcp_shared_loop_thread: threading.Thread | None = None + + class MCPNativeTool(BaseTool): """Native MCP tool that reuses client sessions. @@ -17,8 +23,11 @@ class MCPNativeTool(BaseTool): structured configurations. It reuses existing client sessions for better performance and proper connection lifecycle management. - Unlike MCPToolWrapper which connects on-demand, this tool uses - a shared MCP client instance that maintains a persistent connection. + A dedicated background event loop is used for all MCP operations so that + anyio cancel scopes (used by streamable-HTTP and SSE transports) are + always entered and exited in the same async task. This avoids the + ``RuntimeError: Attempted to exit cancel scope in a different task`` + that occurs when ``asyncio.run()`` creates throwaway event loops. """ def __init__( @@ -61,7 +70,44 @@ class MCPNativeTool(BaseTool): self._mcp_client = mcp_client self._original_tool_name = original_tool_name or tool_name self._server_name = server_name - # self._logger = logging.getLogger(__name__) + + + + @staticmethod + def _ensure_loop() -> asyncio.AbstractEventLoop: + """Return a dedicated event loop running in a background thread. + + The loop is shared across all MCPNativeTool instances so that MCP + connections from the same process coexist on a single loop, which + is both resource-efficient and avoids cross-loop cancel-scope issues. + """ + global _mcp_shared_loop, _mcp_shared_loop_thread + + if _mcp_shared_loop is not None and _mcp_shared_loop.is_running(): + return _mcp_shared_loop + + with _mcp_loop_lock: + # Double-check after acquiring the lock + if _mcp_shared_loop is not None and _mcp_shared_loop.is_running(): + return _mcp_shared_loop + + loop = asyncio.new_event_loop() + thread = threading.Thread( + target=loop.run_forever, + daemon=True, + name="mcp-native-tool-loop", + ) + thread.start() + _mcp_shared_loop = loop + _mcp_shared_loop_thread = thread + return loop + + def _run_in_dedicated_loop(self, coro: Any) -> Any: + """Submit *coro* to the dedicated loop and block until it completes.""" + loop = self._ensure_loop() + future = asyncio.run_coroutine_threadsafe(coro, loop) + return future.result() + @property def mcp_client(self) -> Any: @@ -81,25 +127,13 @@ class MCPNativeTool(BaseTool): def _run(self, **kwargs) -> str: """Execute tool using the MCP client session. - Args: - **kwargs: Arguments to pass to the MCP tool. - - Returns: - Result from the MCP tool execution. + All async work is dispatched to a long-lived background event loop + so that transport-level cancel scopes stay within a single async + task, regardless of whether the caller is inside a Flow thread, + another event loop, or plain synchronous code. """ try: - try: - asyncio.get_running_loop() - - import concurrent.futures - - with concurrent.futures.ThreadPoolExecutor() as executor: - coro = self._run_async(**kwargs) - future = executor.submit(asyncio.run, coro) - return future.result() - except RuntimeError: - return asyncio.run(self._run_async(**kwargs)) - + return self._run_in_dedicated_loop(self._run_async(**kwargs)) except Exception as e: raise RuntimeError( f"Error executing MCP tool {self.original_tool_name}: {e!s}" @@ -108,24 +142,22 @@ class MCPNativeTool(BaseTool): async def _run_async(self, **kwargs) -> str: """Async implementation of tool execution. - Args: - **kwargs: Arguments to pass to the MCP tool. - - Returns: - Result from the MCP tool execution. + Each invocation owns its full connect → call → disconnect lifecycle + so that anyio cancel scopes are always entered and exited in the + same asyncio Task (run_coroutine_threadsafe creates a new Task per + submission). """ - # Note: Since we use asyncio.run() which creates a new event loop each time, - # Always reconnect on-demand because asyncio.run() creates new event loops per call - # All MCP transport context managers (stdio, streamablehttp_client, sse_client) - # use anyio.create_task_group() which can't span different event loops + # Always start with a fresh connection in THIS task. if self._mcp_client.connected: - await self._mcp_client.disconnect() + try: + await self._mcp_client.disconnect() + except Exception: + pass # best-effort; may already be stale await self._mcp_client.connect() try: result = await self._mcp_client.call_tool(self.original_tool_name, kwargs) - except Exception as e: error_str = str(e).lower() if ( @@ -133,26 +165,27 @@ class MCPNativeTool(BaseTool): or "connection" in error_str or "send" in error_str ): - await self._mcp_client.disconnect() + # Connection broke mid-call — reconnect and retry once + try: + await self._mcp_client.disconnect() + except Exception: + pass # best-effort cleanup await self._mcp_client.connect() - # Retry the call result = await self._mcp_client.call_tool( self.original_tool_name, kwargs ) else: raise - finally: - # Always disconnect after tool call to ensure clean context manager lifecycle - # This prevents "exit cancel scope in different task" errors - # All transport context managers must be exited in the same event loop they were entered - await self._mcp_client.disconnect() + try: + await self._mcp_client.disconnect() + except Exception: + pass # best-effort cleanup # Extract result content if isinstance(result, str): return result - # Handle various result formats if hasattr(result, "content") and result.content: if isinstance(result.content, list) and len(result.content) > 0: content_item = result.content[0]