Skip to content

Commit

Permalink
tmp save
Browse files Browse the repository at this point in the history
  • Loading branch information
hfutatzhanghb committed Mar 2, 2025
1 parent b05c0ce commit 9ec8236
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2225,7 +2225,7 @@ protected static FsPermission getParentPermission(final FsPermission mask) {
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
protected HdfsFileStatus getMountPointStatus(
public HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date) {
return getMountPointStatus(name, childrenNum, date, true);
}
Expand All @@ -2240,7 +2240,7 @@ protected HdfsFileStatus getMountPointStatus(
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
protected HdfsFileStatus getMountPointStatus(
public HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date, boolean setPath) {
long modTime = date;
long accessTime = date;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
Expand All @@ -42,6 +43,7 @@
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
import org.apache.hadoop.hdfs.server.federation.router.NoLocationException;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
Expand Down Expand Up @@ -82,6 +84,7 @@
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
Expand All @@ -104,6 +107,8 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol {
private final boolean allowPartialList;
/** Time out when getting the mount statistics. */
private long mountStatusTimeOut;
/** Default nameservice enabled. */
private final boolean defaultNameServiceEnabled;
/** Identifier for the super user. */
private String superUser;
/** Identifier for the super group. */
Expand All @@ -126,6 +131,9 @@ public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer)
this.mountStatusTimeOut = getMountStatusTimeOut();
this.superUser = getSuperUser();
this.superGroup = getSuperGroup();
this.defaultNameServiceEnabled = conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE,
RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT);
}

@Override
Expand Down Expand Up @@ -348,7 +356,6 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
return rpcClient.invokeAll(locations, method);
}

asyncComplete(false);
if (locations.size() > 1) {
// Check if this directory already exists
asyncTry(() -> {
Expand All @@ -367,14 +374,24 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
src, ex.getMessage());
return false;
}, IOException.class);
}
asyncApply((AsyncApplyFunction<Boolean, Boolean>)ret -> {
final RemoteLocation firstLocation = locations.get(0);
asyncTry(() -> {
rpcClient.invokeSingle(firstLocation, method, Boolean.class);
});

final RemoteLocation firstLocation = locations.get(0);
asyncApply((AsyncApplyFunction<Boolean, Boolean>) success -> {
if (success) {
asyncComplete(true);
return;
}
asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
method, src, ioe, firstLocation, locations);
rpcClient.invokeSequential(
newLocations, method, Boolean.class, Boolean.TRUE);
}, IOException.class);
asyncApply((ApplyFunction<Boolean, Boolean>) success -> {
return success;
});
});
} else {
final RemoteLocation firstLocation = locations.get(0);
asyncTry(() -> {
rpcClient.invokeSingle(firstLocation, method, Boolean.class);
});
Expand All @@ -385,7 +402,11 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
rpcClient.invokeSequential(
newLocations, method, Boolean.class, Boolean.TRUE);
}, IOException.class);
});

asyncApply((ApplyFunction<Boolean, Boolean>) success -> {
return success;
});
}

return asyncReturn(Boolean.class);
}
Expand Down Expand Up @@ -480,6 +501,7 @@ public DirectoryListing getListing(
return null;
});
});
boolean finalNamenodeListingExists = namenodeListingExists;
asyncApply(o -> {
// Update the remaining count to include left mount points
if (nnListing.size() > 0) {
Expand All @@ -492,10 +514,12 @@ public DirectoryListing getListing(
}
}
}
return null;
return finalNamenodeListingExists;
});
} else {
asyncComplete(namenodeListingExists);
}
asyncComplete(namenodeListingExists);

asyncApply((ApplyFunction<Boolean, Object>) exists -> {
if (!exists && nnListing.size() == 0 && children == null) {
// NN returns a null object if the directory cannot be found and has no
Expand All @@ -519,29 +543,26 @@ public DirectoryListing getListing(
@Override
protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
String src, byte[] startAfter, boolean needLocation) throws IOException {
List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false, false);
// Locate the dir and fetch the listing.
if (locations.isEmpty()) {
asyncComplete(new ArrayList<>());
return asyncReturn(List.class);
}
asyncTry(() -> {
try {
List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false, false);
// Locate the dir and fetch the listing.
if (locations.isEmpty()) {
asyncComplete(new ArrayList<>());
return asyncReturn(List.class);
}
RemoteMethod method = new RemoteMethod("getListing",
new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
new RemoteParam(), startAfter, needLocation);
rpcClient.invokeConcurrent(locations, method, false, -1,
DirectoryListing.class);
});
asyncCatch((CatchFunction<List, RouterResolveException>) (o, e) -> {
} catch (NoLocationException | RouterResolveException e) {
LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage());
LOG.info("Cannot get locations for {}, {}.", src, e.getMessage());
return new ArrayList<>();
}, RouterResolveException.class);
asyncComplete(new ArrayList<>());
}
return asyncReturn(List.class);
}


@Override
public HdfsFileStatus getFileInfo(String src) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
Expand Down Expand Up @@ -639,7 +660,7 @@ public HdfsFileStatus getMountPointStatus(
final int[] childrenNums = new int[]{childrenNum};
final EnumSet<HdfsFileStatus.Flags>[] flags =
new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)};
asyncComplete(null);

if (getSubclusterResolver() instanceof MountTableResolver) {
asyncTry(() -> {
String mName = name.startsWith("/") ? name : "/" + name;
Expand All @@ -666,11 +687,13 @@ public HdfsFileStatus getMountPointStatus(
}
return fInfo;
});
} else {
asyncComplete(null);
}
});
asyncCatch((CatchFunction<HdfsFileStatus, IOException>) (status, e) -> {
LOG.error("Cannot get mount point: {}", e.getMessage());
return status;
return null;
}, IOException.class);
} else {
try {
Expand All @@ -684,6 +707,8 @@ public HdfsFileStatus getMountPointStatus(
} else {
LOG.debug(msg);
}
} finally {
asyncComplete(null);
}
}
long inodeId = 0;
Expand Down Expand Up @@ -713,8 +738,6 @@ public HdfsFileStatus getMountPointStatus(
@Override
protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method, long timeOutMs) throws IOException {

asyncComplete(null);
// Get the file info from everybody
rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs,
HdfsFileStatus.class);
Expand Down Expand Up @@ -1086,4 +1109,35 @@ public boolean isMultiDestDirectory(String src) throws IOException {
asyncCompleteWith(CompletableFuture.completedFuture(false));
return asyncReturn(Boolean.class);
}

@Override
public Path getEnclosingRoot(String src) throws IOException {
final Path[] mountPath = new Path[1];
if (defaultNameServiceEnabled) {
mountPath[0] = new Path("/");
}

if (subclusterResolver instanceof MountTableResolver) {
MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
if (mountTable.getMountPoint(src) != null) {
mountPath[0] = new Path(mountTable.getMountPoint(src).getSourcePath());
}
}

if (mountPath[0] == null) {
throw new IOException(String.format("No mount point for %s", src));
}

getEZForPath(src);
asyncApply((ApplyFunction<EncryptionZone, Path>)zone -> {
if (zone == null) {
return mountPath[0];
} else {
Path zonePath = new Path(zone.getPath());
return zonePath.depth() > mountPath[0].depth() ? zonePath : mountPath[0];
}
});
return asyncReturn(Path.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,16 @@
*/
public class TestRouterMountTable {

private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext0;
private static NamenodeContext nnContext1;
private static RouterContext routerContext;
private static MountTableResolver mountTable;
private static ClientProtocol routerProtocol;
private static long startTime;
private static FileSystem nnFs0;
private static FileSystem nnFs1;
private static FileSystem routerFs;
protected static StateStoreDFSCluster cluster;
protected static NamenodeContext nnContext0;
protected static NamenodeContext nnContext1;
protected static RouterContext routerContext;
protected static MountTableResolver mountTable;
protected static ClientProtocol routerProtocol;
protected static long startTime;
protected static FileSystem nnFs0;
protected static FileSystem nnFs1;
protected static FileSystem routerFs;

@BeforeClass
public static void globalSetUp() throws Exception {
Expand Down Expand Up @@ -132,6 +132,7 @@ public void clearMountTable() throws IOException {
RemoveMountTableEntryRequest.newInstance(entry.getSourcePath());
mountTableManager.removeMountTableEntry(req2);
}
mountTable.setDefaultNSEnable(true);
}

@Test
Expand Down Expand Up @@ -178,7 +179,7 @@ public void testReadOnly() throws Exception {
* @return If it was succesfully added.
* @throws IOException Problems adding entries.
*/
private boolean addMountTable(final MountTable entry) throws IOException {
protected boolean addMountTable(final MountTable entry) throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
AddMountTableEntryRequest addRequest =
Expand Down Expand Up @@ -377,10 +378,11 @@ public void testGetMountPointStatus() throws IOException {
clientProtocol.getMountPointStatus(childPath2.toString(), 0, 0, false);
assertTrue(dirStatus3.isEmptyLocalName());
}

/**
* GetListing of testPath through router.
*/
private void getListing(String testPath)
protected void getListing(String testPath)
throws IOException, URISyntaxException {
ClientProtocol clientProtocol1 =
routerContext.getClient().getNamenode();
Expand Down Expand Up @@ -788,7 +790,6 @@ public void testListStatusMountPoint() throws Exception {

@Test
public void testGetEnclosingRoot() throws Exception {

// Add a read only entry
MountTable readOnlyEntry = MountTable.newInstance(
"/readonly", Collections.singletonMap("ns0", "/testdir"));
Expand Down
Loading

0 comments on commit 9ec8236

Please sign in to comment.