Skip to content

Commit

Permalink
HDDS-10983. Fix Acceptance test
Browse files Browse the repository at this point in the history
Change-Id: Id81b80a6c4d6137ac287022cd9386ad29ca70194
  • Loading branch information
swamirishi committed Jun 11, 2024
1 parent 5857567 commit dd11beb
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,14 @@ protected List<ChunkInfo> getChunkInfoList() throws IOException {

@VisibleForTesting
protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
Pipeline pipeline = pipelineRef.get();
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing BlockInputStream for get key to access {} with pipeline {}.",
blockID.getContainerID(), xceiverClient.getPipeline());
blockID.getContainerID(), pipeline);
}

GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
xceiverClient, VALIDATORS, blockID, tokenRef.get());
xceiverClient, VALIDATORS, blockID, tokenRef.get(), pipeline.getReplicaIndexes());

return response.getBlockData().getChunksList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -239,6 +241,14 @@ public int getReplicaIndex(DatanodeDetails dn) {
return replicaIndexes.getOrDefault(dn, 0);
}

/**
* Get the replicaIndex Map.
* @return
*/
public Map<DatanodeDetails, Integer> getReplicaIndexes() {
return this.getNodes().stream().collect(Collectors.toMap(Function.identity(), this::getReplicaIndex));
}

/**
* Returns the leader if found else defaults to closest node.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ static <T> T tryEachDatanode(Pipeline pipeline,
* @throws IOException if there is an I/O error while performing the call
*/
public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
List<Validator> validators, BlockID blockID, Token<? extends TokenIdentifier> token) throws IOException {
List<Validator> validators, BlockID blockID, Token<? extends TokenIdentifier> token,
Map<DatanodeDetails, Integer> replicaIndexes) throws IOException {
ContainerCommandRequestProto.Builder builder = getContainerCommandRequestProtoBuilder()
.setCmdType(Type.GetBlock)
.setContainerID(blockID.getContainerID());
Expand All @@ -221,7 +222,7 @@ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
}

return tryEachDatanode(xceiverClient.getPipeline(),
d -> getBlock(xceiverClient, validators, builder, blockID, d),
d -> getBlock(xceiverClient, validators, builder, blockID, d, replicaIndexes),
d -> toErrorMessage(blockID, d));
}

Expand All @@ -231,19 +232,20 @@ static String toErrorMessage(BlockID blockId, DatanodeDetails d) {
}

public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient,
BlockID datanodeBlockID,
Token<? extends TokenIdentifier> token) throws IOException {
return getBlock(xceiverClient, getValidatorList(), datanodeBlockID, token);
BlockID datanodeBlockID, Token<? extends TokenIdentifier> token,
Map<DatanodeDetails, Integer> replicaIndexes) throws IOException {
return getBlock(xceiverClient, getValidatorList(), datanodeBlockID, token, replicaIndexes);
}

private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, List<Validator> validators,
ContainerCommandRequestProto.Builder builder, BlockID blockID, DatanodeDetails datanode) throws IOException {
ContainerCommandRequestProto.Builder builder, BlockID blockID, DatanodeDetails datanode,
Map<DatanodeDetails, Integer> replicaIndexes) throws IOException {
String traceId = TracingUtil.exportCurrentSpan();
if (traceId != null) {
builder.setTraceID(traceId);
}
final DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder();
int replicaIndex = xceiverClient.getPipeline().getReplicaIndex(datanode);
int replicaIndex = replicaIndexes.getOrDefault(datanode, 0);
if (replicaIndex > 0) {
datanodeBlockID.setReplicaIndex(replicaIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private List<ContainerProtos.ChunkInfo> getChunkInfos(OmKeyLocationInfo
xceiverClientSpi = getXceiverClientFactory().acquireClientForReadData(pipeline);

ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClientSpi, blockID, token);
.getBlock(xceiverClientSpi, blockID, token, pipeline.getReplicaIndexes());

chunks = response.getBlockData().getChunksList();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected List<ContainerProtos.ChunkInfo> getChunkInfos(
}
xceiverClientSpi = getXceiverClientFactory().acquireClientForReadData(pipeline);
ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls
.getBlock(xceiverClientSpi, blockID, token);
.getBlock(xceiverClientSpi, blockID, token, pipeline.getReplicaIndexes());

chunks = response.getBlockData().getChunksList();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void invokeXceiverClientGetBlock(XceiverClientSpi client)
.setContainerID(1)
.setLocalID(1)
.setBlockCommitSequenceId(1)
.build()), null);
.build()), null, client.getPipeline().getReplicaIndexes());
}

private void invokeXceiverClientReadChunk(XceiverClientSpi client)
Expand Down

0 comments on commit dd11beb

Please sign in to comment.