diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 54ae52ba8..1d94c4d19 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -860,19 +860,29 @@ class Agent(BaseAgent): try: try: - tools_list = asyncio.run(_setup_client_and_list_tools()) - except RuntimeError as e: - error_msg = str(e).lower() - if "cancel scope" in error_msg or "task" in error_msg: + asyncio.get_running_loop() + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit( + asyncio.run, _setup_client_and_list_tools() + ) + tools_list = future.result() + except RuntimeError: + try: + tools_list = asyncio.run(_setup_client_and_list_tools()) + except RuntimeError as e: + error_msg = str(e).lower() + if "cancel scope" in error_msg or "task" in error_msg: + raise ConnectionError( + "MCP connection failed due to event loop cleanup issues. " + "This may be due to authentication errors or server unavailability." + ) from e + except asyncio.CancelledError as e: raise ConnectionError( - "MCP connection failed due to event loop cleanup issues. " - "This may be due to authentication errors or server unavailability." + "MCP connection was cancelled. This may indicate an authentication " + "error or server unavailability." ) from e - except asyncio.CancelledError as e: - raise ConnectionError( - "MCP connection was cancelled. This may indicate an authentication " - "error or server unavailability." - ) from e if mcp_config.tool_filter: filtered_tools = [] diff --git a/lib/crewai/src/crewai/tools/mcp_native_tool.py b/lib/crewai/src/crewai/tools/mcp_native_tool.py index c10d51eee..f25b2f4d7 100644 --- a/lib/crewai/src/crewai/tools/mcp_native_tool.py +++ b/lib/crewai/src/crewai/tools/mcp_native_tool.py @@ -86,9 +86,17 @@ class MCPNativeTool(BaseTool): Result from the MCP tool execution. """ try: - # Always use asyncio.run() to create a fresh event loop - # This ensures the async context managers work correctly - return asyncio.run(self._run_async(**kwargs)) + try: + asyncio.get_running_loop() + + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor() as executor: + coro = self._run_async(**kwargs) + future = executor.submit(asyncio.run, coro) + return future.result() + except RuntimeError: + return asyncio.run(self._run_async(**kwargs)) except Exception as e: raise RuntimeError( diff --git a/lib/crewai/tests/mcp/test_mcp_config.py b/lib/crewai/tests/mcp/test_mcp_config.py index 627ceb6e2..e55a7d504 100644 --- a/lib/crewai/tests/mcp/test_mcp_config.py +++ b/lib/crewai/tests/mcp/test_mcp_config.py @@ -1,4 +1,5 @@ -from unittest.mock import AsyncMock, patch +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch import pytest from crewai.agent.core import Agent @@ -134,3 +135,66 @@ def test_agent_with_sse_mcp_config(mock_tool_definitions): transport = call_args.kwargs["transport"] assert transport.url == "https://api.example.com/mcp/sse" assert transport.headers == {"Authorization": "Bearer test_token"} + + +def test_mcp_tool_execution_in_sync_context(mock_tool_definitions): + """Test MCPNativeTool execution in synchronous context (normal crew execution).""" + http_config = MCPServerHTTP(url="https://api.example.com/mcp") + + with patch("crewai.agent.core.MCPClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions) + mock_client.connected = False + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client.call_tool = AsyncMock(return_value="test result") + mock_client_class.return_value = mock_client + + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + mcps=[http_config], + ) + + tools = agent.get_mcp_tools([http_config]) + assert len(tools) == 2 + + + tool = tools[0] + result = tool.run(query="test query") + + assert result == "test result" + mock_client.call_tool.assert_called() + + +@pytest.mark.asyncio +async def test_mcp_tool_execution_in_async_context(mock_tool_definitions): + """Test MCPNativeTool execution in async context (e.g., from a Flow).""" + http_config = MCPServerHTTP(url="https://api.example.com/mcp") + + with patch("crewai.agent.core.MCPClient") as mock_client_class: + mock_client = AsyncMock() + mock_client.list_tools = AsyncMock(return_value=mock_tool_definitions) + mock_client.connected = False + mock_client.connect = AsyncMock() + mock_client.disconnect = AsyncMock() + mock_client.call_tool = AsyncMock(return_value="test result") + mock_client_class.return_value = mock_client + + agent = Agent( + role="Test Agent", + goal="Test goal", + backstory="Test backstory", + mcps=[http_config], + ) + + tools = agent.get_mcp_tools([http_config]) + assert len(tools) == 2 + + + tool = tools[0] + result = tool.run(query="test query") + + assert result == "test result" + mock_client.call_tool.assert_called()