From 0cd1485781a163c3182b6b2ddcfb8c3a24b37daf Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Sat, 1 Mar 2025 18:06:11 +0800 Subject: [PATCH] tmp save --- .../router/RouterClientProtocol.java | 4 +- .../async/RouterAsyncClientProtocol.java | 112 +++++++++--- .../router/TestRouterMountTable.java | 27 +-- .../async/TestRouterAsyncMountTable.java | 173 ++++++++++++++++++ .../fsdataset/impl/FsDatasetImpl.java | 4 +- 5 files changed, 274 insertions(+), 46 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index cab4fad19096f..d4b40404b550e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -2225,7 +2225,7 @@ protected static FsPermission getParentPermission(final FsPermission mask) { * @return New HDFS file status representing a mount point. */ @VisibleForTesting - protected HdfsFileStatus getMountPointStatus( + public HdfsFileStatus getMountPointStatus( String name, int childrenNum, long date) { return getMountPointStatus(name, childrenNum, date, true); } @@ -2240,7 +2240,7 @@ protected HdfsFileStatus getMountPointStatus( * @return New HDFS file status representing a mount point. */ @VisibleForTesting - protected HdfsFileStatus getMountPointStatus( + public HdfsFileStatus getMountPointStatus( String name, int childrenNum, long date, boolean setPath) { long modTime = date; long accessTime = date; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java index f991b27b029e4..b41511663cf68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; @@ -42,6 +43,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; import org.apache.hadoop.hdfs.server.federation.router.NoLocationException; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; import org.apache.hadoop.hdfs.server.federation.router.RemoteResult; @@ -82,6 +84,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry; @@ -104,6 +107,8 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol { private final boolean allowPartialList; /** Time out when getting the mount statistics. */ private long mountStatusTimeOut; + /** Default nameservice enabled. */ + private final boolean defaultNameServiceEnabled; /** Identifier for the super user. */ private String superUser; /** Identifier for the super group. */ @@ -126,6 +131,9 @@ public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer) this.mountStatusTimeOut = getMountStatusTimeOut(); this.superUser = getSuperUser(); this.superGroup = getSuperGroup(); + this.defaultNameServiceEnabled = conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE, + RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT); } @Override @@ -348,7 +356,6 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) return rpcClient.invokeAll(locations, method); } - asyncComplete(false); if (locations.size() > 1) { // Check if this directory already exists asyncTry(() -> { @@ -367,14 +374,24 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) src, ex.getMessage()); return false; }, IOException.class); - } + asyncApply((AsyncApplyFunction)ret -> { + final RemoteLocation firstLocation = locations.get(0); + asyncTry(() -> { + rpcClient.invokeSingle(firstLocation, method, Boolean.class); + }); - final RemoteLocation firstLocation = locations.get(0); - asyncApply((AsyncApplyFunction) success -> { - if (success) { - asyncComplete(true); - return; - } + asyncCatch((AsyncCatchFunction) (o, ioe) -> { + final List newLocations = checkFaultTolerantRetry( + method, src, ioe, firstLocation, locations); + rpcClient.invokeSequential( + newLocations, method, Boolean.class, Boolean.TRUE); + }, IOException.class); + asyncApply((ApplyFunction) success -> { + return success; + }); + }); + } else { + final RemoteLocation firstLocation = locations.get(0); asyncTry(() -> { rpcClient.invokeSingle(firstLocation, method, Boolean.class); }); @@ -385,7 +402,11 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) rpcClient.invokeSequential( newLocations, method, Boolean.class, Boolean.TRUE); }, IOException.class); - }); + + asyncApply((ApplyFunction) success -> { + return success; + }); + } return asyncReturn(Boolean.class); } @@ -480,6 +501,7 @@ public DirectoryListing getListing( return null; }); }); + boolean finalNamenodeListingExists = namenodeListingExists; asyncApply(o -> { // Update the remaining count to include left mount points if (nnListing.size() > 0) { @@ -492,10 +514,12 @@ public DirectoryListing getListing( } } } - return null; + return finalNamenodeListingExists; }); + } else { + asyncComplete(namenodeListingExists); } - asyncComplete(namenodeListingExists); + asyncApply((ApplyFunction) exists -> { if (!exists && nnListing.size() == 0 && children == null) { // NN returns a null object if the directory cannot be found and has no @@ -519,29 +543,26 @@ public DirectoryListing getListing( @Override protected List> getListingInt( String src, byte[] startAfter, boolean needLocation) throws IOException { - List locations = - rpcServer.getLocationsForPath(src, false, false); - // Locate the dir and fetch the listing. - if (locations.isEmpty()) { - asyncComplete(new ArrayList<>()); - return asyncReturn(List.class); - } - asyncTry(() -> { + try { + List locations = + rpcServer.getLocationsForPath(src, false, false); + // Locate the dir and fetch the listing. + if (locations.isEmpty()) { + asyncComplete(new ArrayList<>()); + return asyncReturn(List.class); + } RemoteMethod method = new RemoteMethod("getListing", new Class[] {String.class, startAfter.getClass(), boolean.class}, new RemoteParam(), startAfter, needLocation); rpcClient.invokeConcurrent(locations, method, false, -1, DirectoryListing.class); - }); - asyncCatch((CatchFunction) (o, e) -> { + } catch (NoLocationException | RouterResolveException e) { LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage()); - LOG.info("Cannot get locations for {}, {}.", src, e.getMessage()); - return new ArrayList<>(); - }, RouterResolveException.class); + asyncComplete(new ArrayList<>()); + } return asyncReturn(List.class); } - @Override public HdfsFileStatus getFileInfo(String src) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); @@ -639,7 +660,7 @@ public HdfsFileStatus getMountPointStatus( final int[] childrenNums = new int[]{childrenNum}; final EnumSet[] flags = new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)}; - asyncComplete(null); + if (getSubclusterResolver() instanceof MountTableResolver) { asyncTry(() -> { String mName = name.startsWith("/") ? name : "/" + name; @@ -666,11 +687,13 @@ public HdfsFileStatus getMountPointStatus( } return fInfo; }); + } else { + asyncComplete(null); } }); asyncCatch((CatchFunction) (status, e) -> { LOG.error("Cannot get mount point: {}", e.getMessage()); - return status; + return null; }, IOException.class); } else { try { @@ -684,6 +707,8 @@ public HdfsFileStatus getMountPointStatus( } else { LOG.debug(msg); } + } finally { + asyncComplete(null); } } long inodeId = 0; @@ -713,8 +738,6 @@ public HdfsFileStatus getMountPointStatus( @Override protected HdfsFileStatus getFileInfoAll(final List locations, final RemoteMethod method, long timeOutMs) throws IOException { - - asyncComplete(null); // Get the file info from everybody rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs, HdfsFileStatus.class); @@ -1086,4 +1109,35 @@ public boolean isMultiDestDirectory(String src) throws IOException { asyncCompleteWith(CompletableFuture.completedFuture(false)); return asyncReturn(Boolean.class); } + + @Override + public Path getEnclosingRoot(String src) throws IOException { + final Path[] mountPath = new Path[1]; + if (defaultNameServiceEnabled) { + mountPath[0] = new Path("/"); + } + + if (subclusterResolver instanceof MountTableResolver) { + MountTableResolver mountTable = (MountTableResolver) subclusterResolver; + if (mountTable.getMountPoint(src) != null) { + mountPath[0] = new Path(mountTable.getMountPoint(src).getSourcePath()); + } + } + + if (mountPath[0] == null) { + throw new IOException(String.format("No mount point for %s", src)); + } + + getEZForPath(src); + asyncApply((ApplyFunction)zone -> { + if (zone == null) { + return mountPath[0]; + } else { + Path zonePath = new Path(zone.getPath()); + return zonePath.depth() > mountPath[0].depth() ? zonePath : mountPath[0]; + } + }); + return asyncReturn(Path.class); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java index cdcac58c6f72f..77cee101523c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java @@ -70,16 +70,16 @@ */ public class TestRouterMountTable { - private static StateStoreDFSCluster cluster; - private static NamenodeContext nnContext0; - private static NamenodeContext nnContext1; - private static RouterContext routerContext; - private static MountTableResolver mountTable; - private static ClientProtocol routerProtocol; - private static long startTime; - private static FileSystem nnFs0; - private static FileSystem nnFs1; - private static FileSystem routerFs; + protected static StateStoreDFSCluster cluster; + protected static NamenodeContext nnContext0; + protected static NamenodeContext nnContext1; + protected static RouterContext routerContext; + protected static MountTableResolver mountTable; + protected static ClientProtocol routerProtocol; + protected static long startTime; + protected static FileSystem nnFs0; + protected static FileSystem nnFs1; + protected static FileSystem routerFs; @BeforeClass public static void globalSetUp() throws Exception { @@ -132,6 +132,7 @@ public void clearMountTable() throws IOException { RemoveMountTableEntryRequest.newInstance(entry.getSourcePath()); mountTableManager.removeMountTableEntry(req2); } + mountTable.setDefaultNSEnable(true); } @Test @@ -178,7 +179,7 @@ public void testReadOnly() throws Exception { * @return If it was succesfully added. * @throws IOException Problems adding entries. */ - private boolean addMountTable(final MountTable entry) throws IOException { + protected boolean addMountTable(final MountTable entry) throws IOException { RouterClient client = routerContext.getAdminClient(); MountTableManager mountTableManager = client.getMountTableManager(); AddMountTableEntryRequest addRequest = @@ -377,10 +378,11 @@ public void testGetMountPointStatus() throws IOException { clientProtocol.getMountPointStatus(childPath2.toString(), 0, 0, false); assertTrue(dirStatus3.isEmptyLocalName()); } + /** * GetListing of testPath through router. */ - private void getListing(String testPath) + protected void getListing(String testPath) throws IOException, URISyntaxException { ClientProtocol clientProtocol1 = routerContext.getClient().getNamenode(); @@ -788,7 +790,6 @@ public void testListStatusMountPoint() throws Exception { @Test public void testGetEnclosingRoot() throws Exception { - // Add a read only entry MountTable readOnlyEntry = MountTable.newInstance( "/readonly", Collections.singletonMap("ns0", "/testdir")); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java new file mode 100644 index 0000000000000..3e3aba7d3821e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol; +import org.apache.hadoop.hdfs.server.federation.router.TestRouterMountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collections; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test a router end-to-end including the MountTable using async rpc. + */ +public class TestRouterAsyncMountTable extends TestRouterMountTable { + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterAsyncMountTable.class.getName()); + + @BeforeClass + public static void globalSetUp() throws Exception { + startTime = Time.now(); + + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 2); + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + conf.setInt(RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY, 20); + conf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + cluster.addRouterOverrides(conf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + // Get the end points + nnContext0 = cluster.getNamenode("ns0", null); + nnContext1 = cluster.getNamenode("ns1", null); + nnFs0 = nnContext0.getFileSystem(); + nnFs1 = nnContext1.getFileSystem(); + routerContext = cluster.getRandomRouter(); + routerFs = routerContext.getFileSystem(); + Router router = routerContext.getRouter(); + routerProtocol = routerContext.getClient().getNamenode(); + mountTable = (MountTableResolver) router.getSubclusterResolver(); + } + + /** + * Verify the getMountPointStatus result of passing in different parameters. + */ + @Override + @Test + public void testGetMountPointStatus() throws IOException { + MountTable addEntry = MountTable.newInstance("/testA/testB/testC/testD", + Collections.singletonMap("ns0", "/testA/testB/testC/testD")); + assertTrue(addMountTable(addEntry)); + RouterClientProtocol clientProtocol = new RouterAsyncClientProtocol( + nnFs0.getConf(), routerContext.getRouter().getRpcServer()); + String src = "/"; + String child = "testA"; + Path childPath = new Path(src, child); + HdfsFileStatus dirStatus; + clientProtocol.getMountPointStatus(childPath.toString(), 0, 0); + try { + dirStatus = syncReturn(HdfsFileStatus.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertEquals(child, dirStatus.getLocalName()); + String src1 = "/testA"; + String child1 = "testB"; + Path childPath1 = new Path(src1, child1); + HdfsFileStatus dirStatus1; + clientProtocol.getMountPointStatus(childPath1.toString(), 0, 0); + try { + dirStatus1 = syncReturn(HdfsFileStatus.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertEquals(child1, dirStatus1.getLocalName()); + + String src2 = "/testA/testB"; + String child2 = "testC"; + Path childPath2 = new Path(src2, child2); + HdfsFileStatus dirStatus2; + clientProtocol.getMountPointStatus(childPath2.toString(), 0, 0); + try { + dirStatus2 = syncReturn(HdfsFileStatus.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertEquals(child2, dirStatus2.getLocalName()); + + HdfsFileStatus dirStatus3; + clientProtocol.getMountPointStatus(childPath2.toString(), 0, 0, false); + try { + dirStatus3 = syncReturn(HdfsFileStatus.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertTrue(dirStatus3.isEmptyLocalName()); + } + + /** + * Validate whether mount point name gets resolved or not. On successful + * resolution the details returned would be the ones actually set on the mount + * point. + */ + @Test + public void testMountPointResolved() throws IOException { + MountTable addEntry = MountTable.newInstance("/testdir", + Collections.singletonMap("ns0", "/tmp/testdir")); + addEntry.setGroupName("group1"); + addEntry.setOwnerName("owner1"); + assertTrue(addMountTable(addEntry)); + HdfsFileStatus finfo = routerProtocol.getFileInfo("/testdir"); + FileStatus[] finfo1 = routerFs.listStatus(new Path("/")); + assertEquals("owner1", finfo.getOwner()); + assertEquals("owner1", finfo1[0].getOwner()); + assertEquals("group1", finfo.getGroup()); + assertEquals("group1", finfo1[0].getGroup()); + } + + @Override + @Test + public void testListNonExistPath() throws Exception { + mountTable.setDefaultNSEnable(false); + LambdaTestUtils.intercept(FileNotFoundException.class, + "File /base does not exist.", + "Expect FileNotFoundException.", + () -> routerFs.listStatus(new Path("/base"))); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 934c5faee2f92..1ff0b9c920875 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -439,7 +439,7 @@ private synchronized void activateVolume( List allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockNames(); for (String dir : allSubDirNameForDataSetLock) { lockManager.addLock(LockLevel.DIR, bp, ref.getVolume().getStorageID(), dir); - LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", + LOG.debug("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", bp, ref.getVolume().getStorageID(), dir); } } @@ -3297,7 +3297,7 @@ public void addBlockPool(String bpid, Configuration conf) List allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockNames(); for (String dir : allSubDirNameForDataSetLock) { lockManager.addLock(LockLevel.DIR, bpid, v, dir); - LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", + LOG.debug("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", bpid, v, dir); } }