Skip to content

Commit

Permalink
tmp save
Browse files Browse the repository at this point in the history
  • Loading branch information
hfutatzhanghb committed Mar 4, 2025
1 parent b05c0ce commit 9ec776f
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2225,7 +2225,7 @@ protected static FsPermission getParentPermission(final FsPermission mask) {
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
protected HdfsFileStatus getMountPointStatus(
public HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date) {
return getMountPointStatus(name, childrenNum, date, true);
}
Expand All @@ -2240,7 +2240,7 @@ protected HdfsFileStatus getMountPointStatus(
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
protected HdfsFileStatus getMountPointStatus(
public HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date, boolean setPath) {
long modTime = date;
long accessTime = date;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
Expand All @@ -42,6 +43,7 @@
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
import org.apache.hadoop.hdfs.server.federation.router.NoLocationException;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
Expand Down Expand Up @@ -82,6 +84,7 @@
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
Expand All @@ -104,6 +107,8 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol {
private final boolean allowPartialList;
/** Time out when getting the mount statistics. */
private long mountStatusTimeOut;
/** Default nameservice enabled. */
private final boolean defaultNameServiceEnabled;
/** Identifier for the super user. */
private String superUser;
/** Identifier for the super group. */
Expand All @@ -126,6 +131,9 @@ public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer)
this.mountStatusTimeOut = getMountStatusTimeOut();
this.superUser = getSuperUser();
this.superGroup = getSuperGroup();
this.defaultNameServiceEnabled = conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE,
RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT);
}

@Override
Expand Down Expand Up @@ -348,7 +356,6 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
return rpcClient.invokeAll(locations, method);
}

asyncComplete(false);
if (locations.size() > 1) {
// Check if this directory already exists
asyncTry(() -> {
Expand All @@ -367,14 +374,23 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
src, ex.getMessage());
return false;
}, IOException.class);
}
asyncApply((AsyncApplyFunction<Boolean, Boolean>)ret -> {
if (!ret) {
final RemoteLocation firstLocation = locations.get(0);
asyncTry(() -> {
rpcClient.invokeSingle(firstLocation, method, Boolean.class);
});

final RemoteLocation firstLocation = locations.get(0);
asyncApply((AsyncApplyFunction<Boolean, Boolean>) success -> {
if (success) {
asyncComplete(true);
return;
}
asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
method, src, ioe, firstLocation, locations);
rpcClient.invokeSequential(
newLocations, method, Boolean.class, Boolean.TRUE);
}, IOException.class);
}
});
} else {
final RemoteLocation firstLocation = locations.get(0);
asyncTry(() -> {
rpcClient.invokeSingle(firstLocation, method, Boolean.class);
});
Expand All @@ -385,7 +401,7 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
rpcClient.invokeSequential(
newLocations, method, Boolean.class, Boolean.TRUE);
}, IOException.class);
});
}

return asyncReturn(Boolean.class);
}
Expand Down Expand Up @@ -480,6 +496,7 @@ public DirectoryListing getListing(
return null;
});
});
boolean finalNamenodeListingExists = namenodeListingExists;
asyncApply(o -> {
// Update the remaining count to include left mount points
if (nnListing.size() > 0) {
Expand All @@ -492,10 +509,12 @@ public DirectoryListing getListing(
}
}
}
return null;
return finalNamenodeListingExists;
});
} else {
asyncComplete(namenodeListingExists);
}
asyncComplete(namenodeListingExists);

asyncApply((ApplyFunction<Boolean, Object>) exists -> {
if (!exists && nnListing.size() == 0 && children == null) {
// NN returns a null object if the directory cannot be found and has no
Expand All @@ -519,29 +538,26 @@ public DirectoryListing getListing(
@Override
protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
String src, byte[] startAfter, boolean needLocation) throws IOException {
List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false, false);
// Locate the dir and fetch the listing.
if (locations.isEmpty()) {
asyncComplete(new ArrayList<>());
return asyncReturn(List.class);
}
asyncTry(() -> {
try {
List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false, false);
// Locate the dir and fetch the listing.
if (locations.isEmpty()) {
asyncComplete(new ArrayList<>());
return asyncReturn(List.class);
}
RemoteMethod method = new RemoteMethod("getListing",
new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
new RemoteParam(), startAfter, needLocation);
rpcClient.invokeConcurrent(locations, method, false, -1,
DirectoryListing.class);
});
asyncCatch((CatchFunction<List, RouterResolveException>) (o, e) -> {
} catch (NoLocationException | RouterResolveException e) {
LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage());
LOG.info("Cannot get locations for {}, {}.", src, e.getMessage());
return new ArrayList<>();
}, RouterResolveException.class);
asyncComplete(new ArrayList<>());
}
return asyncReturn(List.class);
}


@Override
public HdfsFileStatus getFileInfo(String src) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
Expand Down Expand Up @@ -639,7 +655,14 @@ public HdfsFileStatus getMountPointStatus(
final int[] childrenNums = new int[]{childrenNum};
final EnumSet<HdfsFileStatus.Flags>[] flags =
new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)};
asyncComplete(null);
long inodeId = 0;
HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder();
if (setPath) {
Path path = new Path(name);
String nameStr = path.getName();
builder.path(DFSUtil.string2Bytes(nameStr));
}

if (getSubclusterResolver() instanceof MountTableResolver) {
asyncTry(() -> {
String mName = name.startsWith("/") ? name : "/" + name;
Expand All @@ -664,13 +687,35 @@ public HdfsFileStatus getMountPointStatus(
.getFlags(fInfo.isEncrypted(), fInfo.isErasureCoded(),
fInfo.isSnapshotEnabled(), fInfo.hasAcl());
}
return fInfo;
return builder.isdir(true)
.mtime(modTime)
.atime(accessTime)
.perm(permission[0])
.owner(owner[0])
.group(group[0])
.symlink(new byte[0])
.fileId(inodeId)
.children(childrenNums[0])
.flags(flags[0])
.build();
});
} else {
asyncComplete(builder.isdir(true)
.mtime(modTime)
.atime(accessTime)
.perm(permission[0])
.owner(owner[0])
.group(group[0])
.symlink(new byte[0])
.fileId(inodeId)
.children(childrenNums[0])
.flags(flags[0])
.build());
}
});
asyncCatch((CatchFunction<HdfsFileStatus, IOException>) (status, e) -> {
LOG.error("Cannot get mount point: {}", e.getMessage());
return status;
return null;
}, IOException.class);
} else {
try {
Expand All @@ -684,37 +729,27 @@ public HdfsFileStatus getMountPointStatus(
} else {
LOG.debug(msg);
}
} finally {
asyncComplete(builder.isdir(true)
.mtime(modTime)
.atime(accessTime)
.perm(permission[0])
.owner(owner[0])
.group(group[0])
.symlink(new byte[0])
.fileId(inodeId)
.children(childrenNums[0])
.flags(flags[0])
.build());
}
}
long inodeId = 0;
HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder();
asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) status -> {
if (setPath) {
Path path = new Path(name);
String nameStr = path.getName();
builder.path(DFSUtil.string2Bytes(nameStr));
}

return builder.isdir(true)
.mtime(modTime)
.atime(accessTime)
.perm(permission[0])
.owner(owner[0])
.group(group[0])
.symlink(new byte[0])
.fileId(inodeId)
.children(childrenNums[0])
.flags(flags[0])
.build();
});
return asyncReturn(HdfsFileStatus.class);
}

@Override
protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method, long timeOutMs) throws IOException {

asyncComplete(null);
// Get the file info from everybody
rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs,
HdfsFileStatus.class);
Expand Down Expand Up @@ -1086,4 +1121,35 @@ public boolean isMultiDestDirectory(String src) throws IOException {
asyncCompleteWith(CompletableFuture.completedFuture(false));
return asyncReturn(Boolean.class);
}

@Override
public Path getEnclosingRoot(String src) throws IOException {
final Path[] mountPath = new Path[1];
if (defaultNameServiceEnabled) {
mountPath[0] = new Path("/");
}

if (subclusterResolver instanceof MountTableResolver) {
MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
if (mountTable.getMountPoint(src) != null) {
mountPath[0] = new Path(mountTable.getMountPoint(src).getSourcePath());
}
}

if (mountPath[0] == null) {
throw new IOException(String.format("No mount point for %s", src));
}

getEZForPath(src);
asyncApply((ApplyFunction<EncryptionZone, Path>)zone -> {
if (zone == null) {
return mountPath[0];
} else {
Path zonePath = new Path(zone.getPath());
return zonePath.depth() > mountPath[0].depth() ? zonePath : mountPath[0];
}
});
return asyncReturn(Path.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,16 @@
*/
public class TestRouterMountTable {

private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext0;
private static NamenodeContext nnContext1;
private static RouterContext routerContext;
private static MountTableResolver mountTable;
private static ClientProtocol routerProtocol;
private static long startTime;
private static FileSystem nnFs0;
private static FileSystem nnFs1;
private static FileSystem routerFs;
protected static StateStoreDFSCluster cluster;
protected static NamenodeContext nnContext0;
protected static NamenodeContext nnContext1;
protected static RouterContext routerContext;
protected static MountTableResolver mountTable;
protected static ClientProtocol routerProtocol;
protected static long startTime;
protected static FileSystem nnFs0;
protected static FileSystem nnFs1;
protected static FileSystem routerFs;

@BeforeClass
public static void globalSetUp() throws Exception {
Expand Down Expand Up @@ -132,6 +132,7 @@ public void clearMountTable() throws IOException {
RemoveMountTableEntryRequest.newInstance(entry.getSourcePath());
mountTableManager.removeMountTableEntry(req2);
}
mountTable.setDefaultNSEnable(true);
}

@Test
Expand Down Expand Up @@ -178,7 +179,7 @@ public void testReadOnly() throws Exception {
* @return If it was succesfully added.
* @throws IOException Problems adding entries.
*/
private boolean addMountTable(final MountTable entry) throws IOException {
protected boolean addMountTable(final MountTable entry) throws IOException {
RouterClient client = routerContext.getAdminClient();
MountTableManager mountTableManager = client.getMountTableManager();
AddMountTableEntryRequest addRequest =
Expand Down Expand Up @@ -377,10 +378,11 @@ public void testGetMountPointStatus() throws IOException {
clientProtocol.getMountPointStatus(childPath2.toString(), 0, 0, false);
assertTrue(dirStatus3.isEmptyLocalName());
}

/**
* GetListing of testPath through router.
*/
private void getListing(String testPath)
protected void getListing(String testPath)
throws IOException, URISyntaxException {
ClientProtocol clientProtocol1 =
routerContext.getClient().getNamenode();
Expand Down Expand Up @@ -788,7 +790,6 @@ public void testListStatusMountPoint() throws Exception {

@Test
public void testGetEnclosingRoot() throws Exception {

// Add a read only entry
MountTable readOnlyEntry = MountTable.newInstance(
"/readonly", Collections.singletonMap("ns0", "/testdir"));
Expand Down
Loading

0 comments on commit 9ec776f

Please sign in to comment.