Compare commits

...

3 Commits

Author SHA1 Message Date
lorenzejay
80f6c17464 refactor(MCPNativeTool): remove unused shared loop thread and clean up code
- Eliminated the unused _mcp_shared_loop_thread variable to simplify the MCPNativeTool class.
- Updated the _ensure_loop method to remove redundant checks for the event loop's running state.
- Improved logging formatting for better readability during error handling.
2026-03-09 16:12:48 -07:00
lorenzejay
77ab203154 linted 2026-03-09 16:09:28 -07:00
lorenzejay
fa3b1d5f64 implement dedicated background event loop for MCP operations
- Introduced a shared event loop running in a background thread to manage MCP connections efficiently.
- Updated the MCPNativeTool class to ensure all async operations are dispatched to this dedicated loop, preventing cross-loop cancel-scope issues.
- Enhanced error handling during tool execution to ensure clean disconnection and reconnection processes.
2026-03-09 14:50:36 -07:00

View File

@@ -5,11 +5,20 @@ for better performance and connection management.
"""
import asyncio
import logging
import threading
from typing import Any
from crewai.tools import BaseTool
logger = logging.getLogger(__name__)
_mcp_loop_lock = threading.Lock()
_mcp_shared_loop: asyncio.AbstractEventLoop | None = None
class MCPNativeTool(BaseTool):
"""Native MCP tool that reuses client sessions.
@@ -17,8 +26,11 @@ class MCPNativeTool(BaseTool):
structured configurations. It reuses existing client sessions for
better performance and proper connection lifecycle management.
Unlike MCPToolWrapper which connects on-demand, this tool uses
a shared MCP client instance that maintains a persistent connection.
A dedicated background event loop is used for all MCP operations so that
anyio cancel scopes (used by streamable-HTTP and SSE transports) are
always entered and exited in the same async task. This avoids the
``RuntimeError: Attempted to exit cancel scope in a different task``
that occurs when ``asyncio.run()`` creates throwaway event loops.
"""
def __init__(
@@ -61,7 +73,40 @@ class MCPNativeTool(BaseTool):
self._mcp_client = mcp_client
self._original_tool_name = original_tool_name or tool_name
self._server_name = server_name
# self._logger = logging.getLogger(__name__)
@staticmethod
def _ensure_loop() -> asyncio.AbstractEventLoop:
"""Return a dedicated event loop running in a background thread.
The loop is shared across all MCPNativeTool instances so that MCP
connections from the same process coexist on a single loop, which
is both resource-efficient and avoids cross-loop cancel-scope issues.
"""
global _mcp_shared_loop
if _mcp_shared_loop is not None:
return _mcp_shared_loop
with _mcp_loop_lock:
# Double-check after acquiring the lock
if _mcp_shared_loop is not None:
return _mcp_shared_loop
loop = asyncio.new_event_loop()
thread = threading.Thread(
target=loop.run_forever,
daemon=True,
name="mcp-native-tool-loop",
)
thread.start()
_mcp_shared_loop = loop
return loop
def _run_in_dedicated_loop(self, coro: Any) -> Any:
"""Submit *coro* to the dedicated loop and block until it completes."""
loop = self._ensure_loop()
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()
@property
def mcp_client(self) -> Any:
@@ -81,25 +126,13 @@ class MCPNativeTool(BaseTool):
def _run(self, **kwargs) -> str:
"""Execute tool using the MCP client session.
Args:
**kwargs: Arguments to pass to the MCP tool.
Returns:
Result from the MCP tool execution.
All async work is dispatched to a long-lived background event loop
so that transport-level cancel scopes stay within a single async
task, regardless of whether the caller is inside a Flow thread,
another event loop, or plain synchronous code.
"""
try:
try:
asyncio.get_running_loop()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
coro = self._run_async(**kwargs)
future = executor.submit(asyncio.run, coro)
return future.result()
except RuntimeError:
return asyncio.run(self._run_async(**kwargs))
return self._run_in_dedicated_loop(self._run_async(**kwargs))
except Exception as e:
raise RuntimeError(
f"Error executing MCP tool {self.original_tool_name}: {e!s}"
@@ -108,24 +141,22 @@ class MCPNativeTool(BaseTool):
async def _run_async(self, **kwargs) -> str:
"""Async implementation of tool execution.
Args:
**kwargs: Arguments to pass to the MCP tool.
Returns:
Result from the MCP tool execution.
Each invocation owns its full connect → call → disconnect lifecycle
so that anyio cancel scopes are always entered and exited in the
same asyncio Task (run_coroutine_threadsafe creates a new Task per
submission).
"""
# Note: Since we use asyncio.run() which creates a new event loop each time,
# Always reconnect on-demand because asyncio.run() creates new event loops per call
# All MCP transport context managers (stdio, streamablehttp_client, sse_client)
# use anyio.create_task_group() which can't span different event loops
# Always start with a fresh connection in THIS task.
if self._mcp_client.connected:
await self._mcp_client.disconnect()
try:
await self._mcp_client.disconnect()
except Exception:
logger.debug("Failed to disconnect stale MCP client", exc_info=True)
await self._mcp_client.connect()
try:
result = await self._mcp_client.call_tool(self.original_tool_name, kwargs)
except Exception as e:
error_str = str(e).lower()
if (
@@ -133,26 +164,31 @@ class MCPNativeTool(BaseTool):
or "connection" in error_str
or "send" in error_str
):
await self._mcp_client.disconnect()
# Connection broke mid-call — reconnect and retry once
try:
await self._mcp_client.disconnect()
except Exception:
logger.debug(
"Failed to disconnect MCP client during retry", exc_info=True
)
await self._mcp_client.connect()
# Retry the call
result = await self._mcp_client.call_tool(
self.original_tool_name, kwargs
)
else:
raise
finally:
# Always disconnect after tool call to ensure clean context manager lifecycle
# This prevents "exit cancel scope in different task" errors
# All transport context managers must be exited in the same event loop they were entered
await self._mcp_client.disconnect()
try:
await self._mcp_client.disconnect()
except Exception:
logger.debug(
"Failed to disconnect MCP client during cleanup", exc_info=True
)
# Extract result content
if isinstance(result, str):
return result
# Handle various result formats
if hasattr(result, "content") and result.content:
if isinstance(result.content, list) and len(result.content) > 0:
content_item = result.content[0]