mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-02 05:38:12 +00:00
implement dedicated background event loop for MCP operations
- Introduced a shared event loop running in a background thread to manage MCP connections efficiently. - Updated the MCPNativeTool class to ensure all async operations are dispatched to this dedicated loop, preventing cross-loop cancel-scope issues. - Enhanced error handling during tool execution to ensure clean disconnection and reconnection processes.
This commit is contained in:
@@ -5,11 +5,17 @@ for better performance and connection management.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import threading
|
||||
from typing import Any
|
||||
|
||||
from crewai.tools import BaseTool
|
||||
|
||||
|
||||
_mcp_loop_lock = threading.Lock()
|
||||
_mcp_shared_loop: asyncio.AbstractEventLoop | None = None
|
||||
_mcp_shared_loop_thread: threading.Thread | None = None
|
||||
|
||||
|
||||
class MCPNativeTool(BaseTool):
|
||||
"""Native MCP tool that reuses client sessions.
|
||||
|
||||
@@ -17,8 +23,11 @@ class MCPNativeTool(BaseTool):
|
||||
structured configurations. It reuses existing client sessions for
|
||||
better performance and proper connection lifecycle management.
|
||||
|
||||
Unlike MCPToolWrapper which connects on-demand, this tool uses
|
||||
a shared MCP client instance that maintains a persistent connection.
|
||||
A dedicated background event loop is used for all MCP operations so that
|
||||
anyio cancel scopes (used by streamable-HTTP and SSE transports) are
|
||||
always entered and exited in the same async task. This avoids the
|
||||
``RuntimeError: Attempted to exit cancel scope in a different task``
|
||||
that occurs when ``asyncio.run()`` creates throwaway event loops.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -61,7 +70,44 @@ class MCPNativeTool(BaseTool):
|
||||
self._mcp_client = mcp_client
|
||||
self._original_tool_name = original_tool_name or tool_name
|
||||
self._server_name = server_name
|
||||
# self._logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _ensure_loop() -> asyncio.AbstractEventLoop:
|
||||
"""Return a dedicated event loop running in a background thread.
|
||||
|
||||
The loop is shared across all MCPNativeTool instances so that MCP
|
||||
connections from the same process coexist on a single loop, which
|
||||
is both resource-efficient and avoids cross-loop cancel-scope issues.
|
||||
"""
|
||||
global _mcp_shared_loop, _mcp_shared_loop_thread
|
||||
|
||||
if _mcp_shared_loop is not None and _mcp_shared_loop.is_running():
|
||||
return _mcp_shared_loop
|
||||
|
||||
with _mcp_loop_lock:
|
||||
# Double-check after acquiring the lock
|
||||
if _mcp_shared_loop is not None and _mcp_shared_loop.is_running():
|
||||
return _mcp_shared_loop
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
thread = threading.Thread(
|
||||
target=loop.run_forever,
|
||||
daemon=True,
|
||||
name="mcp-native-tool-loop",
|
||||
)
|
||||
thread.start()
|
||||
_mcp_shared_loop = loop
|
||||
_mcp_shared_loop_thread = thread
|
||||
return loop
|
||||
|
||||
def _run_in_dedicated_loop(self, coro: Any) -> Any:
|
||||
"""Submit *coro* to the dedicated loop and block until it completes."""
|
||||
loop = self._ensure_loop()
|
||||
future = asyncio.run_coroutine_threadsafe(coro, loop)
|
||||
return future.result()
|
||||
|
||||
|
||||
@property
|
||||
def mcp_client(self) -> Any:
|
||||
@@ -81,25 +127,13 @@ class MCPNativeTool(BaseTool):
|
||||
def _run(self, **kwargs) -> str:
|
||||
"""Execute tool using the MCP client session.
|
||||
|
||||
Args:
|
||||
**kwargs: Arguments to pass to the MCP tool.
|
||||
|
||||
Returns:
|
||||
Result from the MCP tool execution.
|
||||
All async work is dispatched to a long-lived background event loop
|
||||
so that transport-level cancel scopes stay within a single async
|
||||
task, regardless of whether the caller is inside a Flow thread,
|
||||
another event loop, or plain synchronous code.
|
||||
"""
|
||||
try:
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
|
||||
import concurrent.futures
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
coro = self._run_async(**kwargs)
|
||||
future = executor.submit(asyncio.run, coro)
|
||||
return future.result()
|
||||
except RuntimeError:
|
||||
return asyncio.run(self._run_async(**kwargs))
|
||||
|
||||
return self._run_in_dedicated_loop(self._run_async(**kwargs))
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
f"Error executing MCP tool {self.original_tool_name}: {e!s}"
|
||||
@@ -108,24 +142,22 @@ class MCPNativeTool(BaseTool):
|
||||
async def _run_async(self, **kwargs) -> str:
|
||||
"""Async implementation of tool execution.
|
||||
|
||||
Args:
|
||||
**kwargs: Arguments to pass to the MCP tool.
|
||||
|
||||
Returns:
|
||||
Result from the MCP tool execution.
|
||||
Each invocation owns its full connect → call → disconnect lifecycle
|
||||
so that anyio cancel scopes are always entered and exited in the
|
||||
same asyncio Task (run_coroutine_threadsafe creates a new Task per
|
||||
submission).
|
||||
"""
|
||||
# Note: Since we use asyncio.run() which creates a new event loop each time,
|
||||
# Always reconnect on-demand because asyncio.run() creates new event loops per call
|
||||
# All MCP transport context managers (stdio, streamablehttp_client, sse_client)
|
||||
# use anyio.create_task_group() which can't span different event loops
|
||||
# Always start with a fresh connection in THIS task.
|
||||
if self._mcp_client.connected:
|
||||
await self._mcp_client.disconnect()
|
||||
try:
|
||||
await self._mcp_client.disconnect()
|
||||
except Exception:
|
||||
pass # best-effort; may already be stale
|
||||
|
||||
await self._mcp_client.connect()
|
||||
|
||||
try:
|
||||
result = await self._mcp_client.call_tool(self.original_tool_name, kwargs)
|
||||
|
||||
except Exception as e:
|
||||
error_str = str(e).lower()
|
||||
if (
|
||||
@@ -133,26 +165,27 @@ class MCPNativeTool(BaseTool):
|
||||
or "connection" in error_str
|
||||
or "send" in error_str
|
||||
):
|
||||
await self._mcp_client.disconnect()
|
||||
# Connection broke mid-call — reconnect and retry once
|
||||
try:
|
||||
await self._mcp_client.disconnect()
|
||||
except Exception:
|
||||
pass # best-effort cleanup
|
||||
await self._mcp_client.connect()
|
||||
# Retry the call
|
||||
result = await self._mcp_client.call_tool(
|
||||
self.original_tool_name, kwargs
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
finally:
|
||||
# Always disconnect after tool call to ensure clean context manager lifecycle
|
||||
# This prevents "exit cancel scope in different task" errors
|
||||
# All transport context managers must be exited in the same event loop they were entered
|
||||
await self._mcp_client.disconnect()
|
||||
try:
|
||||
await self._mcp_client.disconnect()
|
||||
except Exception:
|
||||
pass # best-effort cleanup
|
||||
|
||||
# Extract result content
|
||||
if isinstance(result, str):
|
||||
return result
|
||||
|
||||
# Handle various result formats
|
||||
if hasattr(result, "content") and result.content:
|
||||
if isinstance(result.content, list) and len(result.content) > 0:
|
||||
content_item = result.content[0]
|
||||
|
||||
Reference in New Issue
Block a user