mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
refactor: improve MCP tool execution handling with concurrent futures (#3854)
- Enhanced the MCP tool execution in both synchronous and asynchronous contexts by utilizing for better event loop management. - Updated error handling to provide clearer messages for connection issues and task cancellations. - Added tests to validate MCP tool execution in both sync and async scenarios, ensuring robust functionality across different contexts.
This commit is contained in:
@@ -860,19 +860,29 @@ class Agent(BaseAgent):
|
||||
|
||||
try:
|
||||
try:
|
||||
tools_list = asyncio.run(_setup_client_and_list_tools())
|
||||
except RuntimeError as e:
|
||||
error_msg = str(e).lower()
|
||||
if "cancel scope" in error_msg or "task" in error_msg:
|
||||
asyncio.get_running_loop()
|
||||
import concurrent.futures
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
future = executor.submit(
|
||||
asyncio.run, _setup_client_and_list_tools()
|
||||
)
|
||||
tools_list = future.result()
|
||||
except RuntimeError:
|
||||
try:
|
||||
tools_list = asyncio.run(_setup_client_and_list_tools())
|
||||
except RuntimeError as e:
|
||||
error_msg = str(e).lower()
|
||||
if "cancel scope" in error_msg or "task" in error_msg:
|
||||
raise ConnectionError(
|
||||
"MCP connection failed due to event loop cleanup issues. "
|
||||
"This may be due to authentication errors or server unavailability."
|
||||
) from e
|
||||
except asyncio.CancelledError as e:
|
||||
raise ConnectionError(
|
||||
"MCP connection failed due to event loop cleanup issues. "
|
||||
"This may be due to authentication errors or server unavailability."
|
||||
"MCP connection was cancelled. This may indicate an authentication "
|
||||
"error or server unavailability."
|
||||
) from e
|
||||
except asyncio.CancelledError as e:
|
||||
raise ConnectionError(
|
||||
"MCP connection was cancelled. This may indicate an authentication "
|
||||
"error or server unavailability."
|
||||
) from e
|
||||
|
||||
if mcp_config.tool_filter:
|
||||
filtered_tools = []
|
||||
|
||||
@@ -86,9 +86,17 @@ class MCPNativeTool(BaseTool):
|
||||
Result from the MCP tool execution.
|
||||
"""
|
||||
try:
|
||||
# Always use asyncio.run() to create a fresh event loop
|
||||
# This ensures the async context managers work correctly
|
||||
return asyncio.run(self._run_async(**kwargs))
|
||||
try:
|
||||
asyncio.get_running_loop()
|
||||
|
||||
import concurrent.futures
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
coro = self._run_async(**kwargs)
|
||||
future = executor.submit(asyncio.run, coro)
|
||||
return future.result()
|
||||
except RuntimeError:
|
||||
return asyncio.run(self._run_async(**kwargs))
|
||||
|
||||
except Exception as e:
|
||||
raise RuntimeError(
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from unittest.mock import AsyncMock, patch
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from crewai.agent.core import Agent
|
||||
@@ -134,3 +135,66 @@ def test_agent_with_sse_mcp_config(mock_tool_definitions):
|
||||
transport = call_args.kwargs["transport"]
|
||||
assert transport.url == "https://api.example.com/mcp/sse"
|
||||
assert transport.headers == {"Authorization": "Bearer test_token"}
|
||||
|
||||
|
||||
def test_mcp_tool_execution_in_sync_context(mock_tool_definitions):
|
||||
"""Test MCPNativeTool execution in synchronous context (normal crew execution)."""
|
||||
http_config = MCPServerHTTP(url="https://api.example.com/mcp")
|
||||
|
||||
with patch("crewai.agent.core.MCPClient") as mock_client_class:
|
||||
mock_client = AsyncMock()
|
||||
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
|
||||
mock_client.connected = False
|
||||
mock_client.connect = AsyncMock()
|
||||
mock_client.disconnect = AsyncMock()
|
||||
mock_client.call_tool = AsyncMock(return_value="test result")
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
mcps=[http_config],
|
||||
)
|
||||
|
||||
tools = agent.get_mcp_tools([http_config])
|
||||
assert len(tools) == 2
|
||||
|
||||
|
||||
tool = tools[0]
|
||||
result = tool.run(query="test query")
|
||||
|
||||
assert result == "test result"
|
||||
mock_client.call_tool.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mcp_tool_execution_in_async_context(mock_tool_definitions):
|
||||
"""Test MCPNativeTool execution in async context (e.g., from a Flow)."""
|
||||
http_config = MCPServerHTTP(url="https://api.example.com/mcp")
|
||||
|
||||
with patch("crewai.agent.core.MCPClient") as mock_client_class:
|
||||
mock_client = AsyncMock()
|
||||
mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions)
|
||||
mock_client.connected = False
|
||||
mock_client.connect = AsyncMock()
|
||||
mock_client.disconnect = AsyncMock()
|
||||
mock_client.call_tool = AsyncMock(return_value="test result")
|
||||
mock_client_class.return_value = mock_client
|
||||
|
||||
agent = Agent(
|
||||
role="Test Agent",
|
||||
goal="Test goal",
|
||||
backstory="Test backstory",
|
||||
mcps=[http_config],
|
||||
)
|
||||
|
||||
tools = agent.get_mcp_tools([http_config])
|
||||
assert len(tools) == 2
|
||||
|
||||
|
||||
tool = tools[0]
|
||||
result = tool.run(query="test query")
|
||||
|
||||
assert result == "test result"
|
||||
mock_client.call_tool.assert_called()
|
||||
|
||||
Reference in New Issue
Block a user