diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index c5487832826..9d448c6fe36 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -116,7 +116,7 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, XceiverClientFactory xceiverClientFactory, Function refreshFunction, - OzoneClientConfig config) { + OzoneClientConfig config) throws IOException { this.blockID = blockId; this.length = blockLen; setPipeline(pipeline); @@ -133,7 +133,7 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline, Token token, XceiverClientFactory xceiverClientFactory, OzoneClientConfig config - ) { + ) throws IOException { this(blockId, blockLen, pipeline, token, xceiverClientFactory, null, config); } @@ -244,33 +244,28 @@ protected List getChunkInfoList() throws IOException { @VisibleForTesting protected List getChunkInfoListUsingClient() throws IOException { - final Pipeline pipeline = xceiverClient.getPipeline(); - + Pipeline pipeline = pipelineRef.get(); if (LOG.isDebugEnabled()) { - LOG.debug("Initializing BlockInputStream for get key to access {}", - blockID.getContainerID()); - } - - DatanodeBlockID.Builder blkIDBuilder = - DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID()) - .setLocalID(blockID.getLocalID()) - .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId()); - - int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode()); - if (replicaIndex > 0) { - blkIDBuilder.setReplicaIndex(replicaIndex); + LOG.debug("Initializing BlockInputStream for get key to access {} with pipeline {}.", + blockID.getContainerID(), pipeline); } GetBlockResponseProto response = ContainerProtocolCalls.getBlock( - xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get()); + xceiverClient, VALIDATORS, blockID, tokenRef.get(), pipeline.getReplicaIndexes()); return response.getBlockData().getChunksList(); } - private void setPipeline(Pipeline pipeline) { + private void setPipeline(Pipeline pipeline) throws IOException { if (pipeline == null) { return; } + long replicaIndexes = pipeline.getNodes().stream().mapToInt(pipeline::getReplicaIndex).distinct().count(); + + if (replicaIndexes > 1) { + throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.", + pipeline)); + } // irrespective of the container state, we will always read via Standalone // protocol. diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index b30f555795b..983bb74989a 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -26,6 +26,8 @@ import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; @@ -60,6 +62,7 @@ public class ChunkInputStream extends InputStream private final ChunkInfo chunkInfo; private final long length; private final BlockID blockID; + private ContainerProtos.DatanodeBlockID datanodeBlockID; private final XceiverClientFactory xceiverClientFactory; private XceiverClientSpi xceiverClient; private final Supplier pipelineSupplier; @@ -290,13 +293,27 @@ protected synchronized void releaseClient() { } } + /** + * Updates DatanodeBlockId which based on blockId. + */ + private void updateDatanodeBlockId(Pipeline pipeline) throws IOException { + DatanodeDetails closestNode = pipeline.getClosestNode(); + int replicaIdx = pipeline.getReplicaIndex(closestNode); + ContainerProtos.DatanodeBlockID.Builder builder = blockID.getDatanodeBlockIDProtobufBuilder(); + if (replicaIdx > 0) { + builder.setReplicaIndex(replicaIdx); + } + datanodeBlockID = builder.build(); + } + /** * Acquire new client if previous one was released. */ protected synchronized void acquireClient() throws IOException { if (xceiverClientFactory != null && xceiverClient == null) { - xceiverClient = xceiverClientFactory.acquireClientForReadData( - pipelineSupplier.get()); + Pipeline pipeline = pipelineSupplier.get(); + xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline); + updateDatanodeBlockId(pipeline); } } @@ -422,8 +439,8 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) throws IOException { ReadChunkResponseProto readChunkResponse = - ContainerProtocolCalls.readChunk(xceiverClient, - readChunkInfo, blockID, validators, tokenSupplier.get()); + ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, datanodeBlockID, validators, + tokenSupplier.get()); if (readChunkResponse.hasData()) { return readChunkResponse.getData().asReadOnlyByteBufferList() diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java index 6f8a744f762..d347dee8512 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; import org.apache.hadoop.security.token.Token; +import java.io.IOException; import java.util.function.Function; /** @@ -52,6 +53,6 @@ BlockExtendedInputStream create(ReplicationConfig repConfig, Token token, XceiverClientFactory xceiverFactory, Function refreshFunction, - OzoneClientConfig config); + OzoneClientConfig config) throws IOException; } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java index 6bcdc3c4811..19ea76fa706 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.security.token.Token; +import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Function; @@ -80,7 +81,7 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig, Token token, XceiverClientFactory xceiverFactory, Function refreshFunction, - OzoneClientConfig config) { + OzoneClientConfig config) throws IOException { if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) { return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig, blockInfo, xceiverFactory, refreshFunction, diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java index 8dc07f129b9..6342de2c338 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java @@ -164,7 +164,7 @@ protected int currentStreamIndex() { * stream if it has not been opened already. * @return BlockInput stream to read from. */ - protected BlockExtendedInputStream getOrOpenStream(int locationIndex) { + protected BlockExtendedInputStream getOrOpenStream(int locationIndex) throws IOException { BlockExtendedInputStream stream = blockStreams[locationIndex]; if (stream == null) { // To read an EC block, we create a STANDALONE pipeline that contains the @@ -176,8 +176,8 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) { .setReplicationConfig(StandaloneReplicationConfig.getInstance( HddsProtos.ReplicationFactor.ONE)) .setNodes(Arrays.asList(dataLocation)) - .setId(PipelineID.valueOf(dataLocation.getUuid())).setReplicaIndexes( - ImmutableMap.of(dataLocation, locationIndex + 1)) + .setId(PipelineID.valueOf(dataLocation.getUuid())) + .setReplicaIndexes(ImmutableMap.of(dataLocation, locationIndex + 1)) .setState(Pipeline.PipelineState.CLOSED) .build(); @@ -228,6 +228,7 @@ protected Function ecPipelineRefreshFunction( HddsProtos.ReplicationFactor.ONE)) .setNodes(Collections.singletonList(curIndexNode)) .setId(PipelineID.randomId()) + .setReplicaIndexes(Collections.singletonMap(curIndexNode, replicaIndex)) .setState(Pipeline.PipelineState.CLOSED) .build(); blockLocationInfo.setPipeline(pipeline); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java index a89097533d2..46ce89da0f8 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java @@ -49,7 +49,7 @@ class DummyBlockInputStream extends BlockInputStream { Function refreshFunction, List chunkList, Map chunks, - OzoneClientConfig config) { + OzoneClientConfig config) throws IOException { super(blockId, blockLen, pipeline, token, xceiverClientManager, refreshFunction, config); this.chunkDataMap = chunks; diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java index 6d12614228f..6ab31f8c372 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java @@ -56,7 +56,7 @@ final class DummyBlockInputStreamWithRetry List chunkList, Map chunkMap, AtomicBoolean isRerfreshed, IOException ioException, - OzoneClientConfig config) { + OzoneClientConfig config) throws IOException { super(blockId, blockLen, pipeline, token, xceiverClientManager, blockID -> { isRerfreshed.set(true); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java index 21b088ce85f..3aabe5f67ad 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java @@ -355,7 +355,7 @@ private static ChunkInputStream throwingChunkInputStream(IOException ex, } private BlockInputStream createSubject(BlockID blockID, Pipeline pipeline, - ChunkInputStream stream) { + ChunkInputStream stream) throws IOException { OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); clientConfig.setChecksumVerify(false); return new DummyBlockInputStream(blockID, blockSize, pipeline, null, diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java index 623f7a4f86f..dda631372cb 100644 --- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestBlockInputStreamFactoryImpl.java @@ -32,13 +32,16 @@ import org.apache.hadoop.hdds.scm.storage.BlockInputStream; import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.mockito.ArgumentMatchers.any; /** * Tests for BlockInputStreamFactoryImpl. @@ -48,14 +51,16 @@ public class TestBlockInputStreamFactoryImpl { private OzoneConfiguration conf = new OzoneConfiguration(); @Test - public void testNonECGivesBlockInputStream() { + public void testNonECGivesBlockInputStream() throws IOException { BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl(); ReplicationConfig repConfig = RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE); BlockLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3, 1024 * 1024 * 10); - + Pipeline pipeline = Mockito.spy(blockInfo.getPipeline()); + blockInfo.setPipeline(pipeline); + Mockito.when(pipeline.getReplicaIndex(any(DatanodeDetails.class))).thenReturn(1); OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); clientConfig.setChecksumVerify(true); BlockExtendedInputStream stream = @@ -68,7 +73,7 @@ public void testNonECGivesBlockInputStream() { } @Test - public void testECGivesECBlockInputStream() { + public void testECGivesECBlockInputStream() throws IOException { BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl(); ReplicationConfig repConfig = new ECReplicationConfig(3, 2); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java index 8540a0c5ab8..e3e3c3fa9ec 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/client/BlockID.java @@ -23,30 +23,41 @@ import java.util.Objects; /** - * BlockID of Ozone (containerID + localID + blockCommitSequenceId). + * BlockID of Ozone (containerID + localID + blockCommitSequenceId + replicaIndex). */ public class BlockID { private final ContainerBlockID containerBlockID; private long blockCommitSequenceId; + // null value when not set with private constructor.(This is to avoid confusion of replica index 0 & null value). + // This value would be only set when deserializing from ContainerProtos.DatanodeBlockID or copying from another + // BlockID object. + private final Integer replicaIndex; public BlockID(long containerID, long localID) { - this(containerID, localID, 0); + this(containerID, localID, 0, null); } - private BlockID(long containerID, long localID, long bcsID) { + private BlockID(long containerID, long localID, long bcsID, Integer repIndex) { containerBlockID = new ContainerBlockID(containerID, localID); blockCommitSequenceId = bcsID; + this.replicaIndex = repIndex; + } + + public BlockID(BlockID blockID) { + this(blockID.getContainerID(), blockID.getLocalID(), blockID.getBlockCommitSequenceId(), + blockID.getReplicaIndex()); } public BlockID(ContainerBlockID containerBlockID) { - this(containerBlockID, 0); + this(containerBlockID, 0, null); } - private BlockID(ContainerBlockID containerBlockID, long bcsId) { + private BlockID(ContainerBlockID containerBlockID, long bcsId, Integer repIndex) { this.containerBlockID = containerBlockID; blockCommitSequenceId = bcsId; + this.replicaIndex = repIndex; } public long getContainerID() { @@ -65,6 +76,11 @@ public void setBlockCommitSequenceId(long blockCommitSequenceId) { this.blockCommitSequenceId = blockCommitSequenceId; } + // Can return a null value in case it is not set. + public Integer getReplicaIndex() { + return replicaIndex; + } + public ContainerBlockID getContainerBlockID() { return containerBlockID; } @@ -79,21 +95,32 @@ public String toString() { public void appendTo(StringBuilder sb) { containerBlockID.appendTo(sb); sb.append(" bcsId: ").append(blockCommitSequenceId); + sb.append(" replicaIndex: ").append(replicaIndex); } @JsonIgnore public ContainerProtos.DatanodeBlockID getDatanodeBlockIDProtobuf() { + ContainerProtos.DatanodeBlockID.Builder blockID = getDatanodeBlockIDProtobufBuilder(); + if (replicaIndex != null) { + blockID.setReplicaIndex(replicaIndex); + } + return blockID.build(); + } + + @JsonIgnore + public ContainerProtos.DatanodeBlockID.Builder getDatanodeBlockIDProtobufBuilder() { return ContainerProtos.DatanodeBlockID.newBuilder(). setContainerID(containerBlockID.getContainerID()) .setLocalID(containerBlockID.getLocalID()) - .setBlockCommitSequenceId(blockCommitSequenceId).build(); + .setBlockCommitSequenceId(blockCommitSequenceId); } @JsonIgnore - public static BlockID getFromProtobuf( - ContainerProtos.DatanodeBlockID blockID) { + public static BlockID getFromProtobuf(ContainerProtos.DatanodeBlockID blockID) { return new BlockID(blockID.getContainerID(), - blockID.getLocalID(), blockID.getBlockCommitSequenceId()); + blockID.getLocalID(), + blockID.getBlockCommitSequenceId(), + blockID.hasReplicaIndex() ? blockID.getReplicaIndex() : null); } @JsonIgnore @@ -107,7 +134,7 @@ public HddsProtos.BlockID getProtobuf() { public static BlockID getFromProtobuf(HddsProtos.BlockID blockID) { return new BlockID( ContainerBlockID.getFromProtobuf(blockID.getContainerBlockID()), - blockID.getBlockCommitSequenceId()); + blockID.getBlockCommitSequenceId(), null); } @Override @@ -119,14 +146,14 @@ public boolean equals(Object o) { return false; } BlockID blockID = (BlockID) o; - return containerBlockID.equals(blockID.getContainerBlockID()) - && blockCommitSequenceId == blockID.getBlockCommitSequenceId(); + return this.getContainerBlockID().equals(blockID.getContainerBlockID()) + && this.getBlockCommitSequenceId() == blockID.getBlockCommitSequenceId() + && Objects.equals(this.getReplicaIndex(), blockID.getReplicaIndex()); } @Override public int hashCode() { - return Objects - .hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(), - blockCommitSequenceId); + return Objects.hash(containerBlockID.getContainerID(), containerBlockID.getLocalID(), + blockCommitSequenceId, replicaIndex); } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 6ea92f74c19..1486f05f55c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -31,6 +31,8 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.ImmutableList; @@ -239,6 +241,14 @@ public int getReplicaIndex(DatanodeDetails dn) { return replicaIndexes.getOrDefault(dn, 0); } + /** + * Get the replicaIndex Map. + * @return + */ + public Map getReplicaIndexes() { + return this.getNodes().stream().collect(Collectors.toMap(Function.identity(), this::getReplicaIndex)); + } + /** * Returns the leader if found else defaults to closest node. * @@ -509,7 +519,10 @@ public String toString() { new StringBuilder(getClass().getSimpleName()).append("["); b.append(" Id: ").append(id.getId()); b.append(", Nodes: "); - nodeStatus.keySet().forEach(b::append); + for (DatanodeDetails datanodeDetails : nodeStatus.keySet()) { + b.append(datanodeDetails); + b.append(" ReplicaIndex: ").append(this.getReplicaIndex(datanodeDetails)); + } b.append(", ReplicationConfig: ").append(replicationConfig); b.append(", State:").append(getPipelineState()); b.append(", leaderId:").append(leaderId != null ? leaderId.toString() : ""); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 66c8459a01a..659ddf2738b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -183,53 +183,56 @@ static T tryEachDatanode(Pipeline pipeline, * * @param xceiverClient client to perform call * @param validators functions to validate the response - * @param datanodeBlockID blockID to identify container + * @param blockID blockID to identify container * @param token a token for this block (may be null) * @return container protocol get block response * @throws IOException if there is an I/O error while performing the call */ public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, - List validators, - DatanodeBlockID datanodeBlockID, - Token token) throws IOException { - GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto - .newBuilder() - .setBlockID(datanodeBlockID); + List validators, BlockID blockID, Token token, + Map replicaIndexes) throws IOException { ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto .newBuilder() .setCmdType(Type.GetBlock) - .setContainerID(datanodeBlockID.getContainerID()) - .setGetBlock(readBlockRequest); + .setContainerID(blockID.getContainerID()); if (token != null) { builder.setEncodedToken(token.encodeToUrlString()); } return tryEachDatanode(xceiverClient.getPipeline(), - d -> getBlock(xceiverClient, validators, builder, d), - d -> toErrorMessage(datanodeBlockID, d)); + d -> getBlock(xceiverClient, validators, builder, blockID, d, replicaIndexes), + d -> toErrorMessage(blockID, d)); } - static String toErrorMessage(DatanodeBlockID blockId, DatanodeDetails d) { + static String toErrorMessage(BlockID blockId, DatanodeDetails d) { return String.format("Failed to get block #%s in container #%s from %s", blockId.getLocalID(), blockId.getContainerID(), d); } public static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, - DatanodeBlockID datanodeBlockID, - Token token) throws IOException { - return getBlock(xceiverClient, getValidatorList(), datanodeBlockID, token); + BlockID datanodeBlockID, + Token token, Map replicaIndexes) throws IOException { + return getBlock(xceiverClient, getValidatorList(), datanodeBlockID, token, replicaIndexes); } private static GetBlockResponseProto getBlock(XceiverClientSpi xceiverClient, List validators, - ContainerCommandRequestProto.Builder builder, - DatanodeDetails datanode) throws IOException { + ContainerCommandRequestProto.Builder builder, BlockID blockID, + DatanodeDetails datanode, Map replicaIndexes) throws IOException { String traceId = TracingUtil.exportCurrentSpan(); if (traceId != null) { builder.setTraceID(traceId); } + final DatanodeBlockID.Builder datanodeBlockID = blockID.getDatanodeBlockIDProtobufBuilder(); + int replicaIndex = replicaIndexes.getOrDefault(datanode, 0); + if (replicaIndex > 0) { + datanodeBlockID.setReplicaIndex(replicaIndex); + } + final GetBlockRequestProto.Builder readBlockRequest = GetBlockRequestProto.newBuilder() + .setBlockID(datanodeBlockID.build()); final ContainerCommandRequestProto request = builder - .setDatanodeUuid(datanode.getUuidString()).build(); + .setDatanodeUuid(datanode.getUuidString()) + .setGetBlock(readBlockRequest).build(); ContainerCommandResponseProto response = xceiverClient.sendCommand(request, validators); return response.getGetBlock(); @@ -324,12 +327,12 @@ public static ContainerCommandRequestProto getPutBlockRequest( * @throws IOException if there is an I/O error while performing the call */ public static ContainerProtos.ReadChunkResponseProto readChunk( - XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, + XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, List validators, Token token) throws IOException { ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto.newBuilder() - .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setBlockID(blockID) .setChunkData(chunk) .setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1); ContainerCommandRequestProto.Builder builder = @@ -356,7 +359,7 @@ public static ContainerProtos.ReadChunkResponseProto readChunk( } private static ContainerProtos.ReadChunkResponseProto readChunk( - XceiverClientSpi xceiverClient, ChunkInfo chunk, BlockID blockID, + XceiverClientSpi xceiverClient, ChunkInfo chunk, DatanodeBlockID blockID, List validators, ContainerCommandRequestProto.Builder builder, DatanodeDetails d) throws IOException { @@ -378,7 +381,7 @@ private static ContainerProtos.ReadChunkResponseProto readChunk( return response; } - static String toErrorMessage(ChunkInfo chunk, BlockID blockId, + static String toErrorMessage(ChunkInfo chunk, DatanodeBlockID blockId, DatanodeDetails d) { return String.format("Failed to read chunk %s (len=%s) %s from %s", chunk.getChunkName(), chunk.getLen(), blockId, d); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/ClientVersion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/ClientVersion.java index cc6695dc7d6..f3bd1a96b66 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/ClientVersion.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/ClientVersion.java @@ -42,6 +42,10 @@ public enum ClientVersion implements ComponentVersion { "This client version has support for Object Store and File " + "System Optimized Bucket Layouts."), + EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST(4, + "This client version enforces replica index is set for fixing read corruption that could occur when " + + "replicaIndex parameter is not validated before EC block reads."), + FUTURE_VERSION(-1, "Used internally when the server side is older and an" + " unknown client version has arrived from the client."); diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index cf96b2145d7..dce14f0d1fe 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -518,7 +518,16 @@ public static ContainerCommandRequestProto getDeleteContainer( } public static BlockID getTestBlockID(long containerID) { - return new BlockID(containerID, UniqueId.next()); + return getTestBlockID(containerID, null); + } + + public static BlockID getTestBlockID(long containerID, Integer replicaIndex) { + DatanodeBlockID.Builder datanodeBlockID = DatanodeBlockID.newBuilder().setContainerID(containerID) + .setLocalID(UniqueId.next()); + if (replicaIndex != null) { + datanodeBlockID.setReplicaIndex(replicaIndex); + } + return BlockID.getFromProtobuf(datanodeBlockID.build()); } public static long getTestContainerID() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index e575a93de27..2da5a05cc58 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -116,6 +116,7 @@ import static org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerDataProto.State.RECOVERING; +import static org.apache.hadoop.ozone.ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST; import static org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult; import org.apache.ratis.statemachine.StateMachine; @@ -570,6 +571,15 @@ ContainerCommandResponseProto handleEcho( return getEchoResponse(request); } + /** + * Checks if a replicaIndex needs to be checked based on the client version for a request. + * @param request ContainerCommandRequest object. + * @return true if the validation is required for the client version else false. + */ + private boolean replicaIndexCheckRequired(ContainerCommandRequestProto request) { + return request.hasVersion() && request.getVersion() >= EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST.toProtoValue(); + } + /** * Handle Get Block operation. Calls BlockManager to process the request. */ @@ -588,8 +598,10 @@ ContainerCommandResponseProto handleGetBlock( try { BlockID blockID = BlockID.getFromProtobuf( request.getGetBlock().getBlockID()); - responseData = blockManager.getBlock(kvContainer, blockID) - .getProtoBufMessage(); + if (replicaIndexCheckRequired(request)) { + BlockUtils.verifyReplicaIdx(kvContainer, blockID); + } + responseData = blockManager.getBlock(kvContainer, blockID).getProtoBufMessage(); final long numBytes = responseData.getSerializedSize(); metrics.incContainerBytesStats(Type.GetBlock, numBytes); @@ -691,7 +703,6 @@ ContainerCommandResponseProto handleDeleteBlock( ContainerCommandResponseProto handleReadChunk( ContainerCommandRequestProto request, KeyValueContainer kvContainer, DispatcherContext dispatcherContext) { - if (!request.hasReadChunk()) { if (LOG.isDebugEnabled()) { LOG.debug("Malformed Read Chunk request. trace ID: {}", @@ -699,7 +710,6 @@ ContainerCommandResponseProto handleReadChunk( } return malformedRequest(request); } - ChunkBuffer data; try { BlockID blockID = BlockID.getFromProtobuf( @@ -707,8 +717,11 @@ ContainerCommandResponseProto handleReadChunk( ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getReadChunk() .getChunkData()); Preconditions.checkNotNull(chunkInfo); - + if (replicaIndexCheckRequired(request)) { + BlockUtils.verifyReplicaIdx(kvContainer, blockID); + } BlockUtils.verifyBCSId(kvContainer, blockID); + if (dispatcherContext == null) { dispatcherContext = DispatcherContext.getHandleReadChunk(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java index 376285c4c72..7773b54f794 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java @@ -28,12 +28,12 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; import org.apache.hadoop.ozone.container.common.utils.ContainerCache; import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache; import org.apache.hadoop.ozone.container.common.utils.RawDB; import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import com.google.common.base.Preconditions; @@ -42,6 +42,7 @@ import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaThreeImpl; import org.apache.hadoop.ozone.container.metadata.DatanodeStoreSchemaTwoImpl; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.EXPORT_CONTAINER_METADATA_FAILED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IMPORT_CONTAINER_METADATA_FAILED; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; @@ -220,7 +221,7 @@ public static BlockData getBlockData(byte[] bytes) throws IOException { * @param blockID requested block info * @throws IOException if cannot support block's blockCommitSequenceId */ - public static void verifyBCSId(KeyValueContainer container, BlockID blockID) + public static void verifyBCSId(Container container, BlockID blockID) throws IOException { long bcsId = blockID.getBlockCommitSequenceId(); Preconditions.checkNotNull(blockID, @@ -237,6 +238,24 @@ public static void verifyBCSId(KeyValueContainer container, BlockID blockID) } } + /** + * Verify if request's replicaIndex matches with containerData. + * + * @param container container object. + * @param blockID requested block info + * @throws IOException if replicaIndex mismatches. + */ + public static void verifyReplicaIdx(Container container, BlockID blockID) + throws IOException { + Integer containerReplicaIndex = container.getContainerData().getReplicaIndex(); + if (containerReplicaIndex > 0 && !containerReplicaIndex.equals(blockID.getReplicaIndex())) { + throw new StorageContainerException( + "Unable to find the Container with replicaIdx " + blockID.getReplicaIndex() + ". Container " + + container.getContainerData().getContainerID() + " replicaIdx is " + + containerReplicaIndex + ".", CONTAINER_NOT_FOUND); + } + } + /** * Remove container KV metadata from per-disk db store. * @param containerData diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index af515c33e29..9a2cf7c5eac 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -39,7 +39,6 @@ import com.google.common.base.Preconditions; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,24 +207,11 @@ public static long persistPutBlock(KeyValueContainer container, } @Override - public BlockData getBlock(Container container, BlockID blockID) - throws IOException { - long bcsId = blockID.getBlockCommitSequenceId(); - Preconditions.checkNotNull(blockID, - "BlockID cannot be null in GetBlock request"); - Preconditions.checkNotNull(container, - "Container cannot be null"); - + public BlockData getBlock(Container container, BlockID blockID) throws IOException { + BlockUtils.verifyBCSId(container, blockID); KeyValueContainerData containerData = (KeyValueContainerData) container .getContainerData(); - long containerBCSId = containerData.getBlockCommitSequenceId(); - if (containerBCSId < bcsId) { - throw new StorageContainerException( - "Unable to find the block with bcsID " + bcsId + ". Container " - + containerData.getContainerID() + " bcsId is " - + containerBCSId + ".", UNKNOWN_BCSID); - } - + long bcsId = blockID.getBlockCommitSequenceId(); try (DBHandle db = BlockUtils.getDB(containerData, config)) { // This is a post condition that acts as a hint to the user. // Should never fail. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index 02b7e93d50f..3eab64b312f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -58,8 +58,8 @@ long putBlock(Container container, BlockData data, boolean endOfBlock) * @return Block Data. * @throws IOException when BcsId is unknown or mismatched */ - BlockData getBlock(Container container, BlockID blockID) - throws IOException; + BlockData getBlock(Container container, BlockID blockID) throws IOException; + /** * Deletes an existing block. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestBlockData.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestBlockData.java index 5aa4f0d9bf5..11cae324226 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestBlockData.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestBlockData.java @@ -134,7 +134,7 @@ public void testToString() { final BlockID blockID = new BlockID(5, 123); blockID.setBlockCommitSequenceId(42); final BlockData subject = new BlockData(blockID); - assertEquals("[blockId=conID: 5 locID: 123 bcsId: 42, size=0]", + assertEquals("[blockId=conID: 5 locID: 123 bcsId: 42 replicaIndex: null, size=0]", subject.toString()); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java index d9b85a7ce80..0bff809314e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.container.common.ContainerTestUtils; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; @@ -34,11 +35,17 @@ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import java.util.UUID; +import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS; @@ -95,6 +102,30 @@ public void testGetBlock() { assertEquals(UNKNOWN_BCSID, response.getResult()); } + private static Stream getAllClientVersions() { + return Arrays.stream(ClientVersion.values()).flatMap(client -> IntStream.range(0, 6) + .mapToObj(rid -> Arguments.of(client, rid))); + } + + + @ParameterizedTest + @MethodSource("getAllClientVersions") + public void testGetBlockWithReplicaIndexMismatch(ClientVersion clientVersion, int replicaIndex) { + KeyValueContainer container = getMockContainerWithReplicaIndex(replicaIndex); + KeyValueHandler handler = getDummyHandler(); + for (int rid = 0; rid <= 5; rid++) { + ContainerProtos.ContainerCommandResponseProto response = + handler.handleGetBlock( + getDummyCommandRequestProto(clientVersion, ContainerProtos.Type.GetBlock, rid), + container); + assertEquals((replicaIndex > 0 && rid != replicaIndex && clientVersion.toProtoValue() >= + ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST.toProtoValue()) ? + ContainerProtos.Result.CONTAINER_NOT_FOUND : UNKNOWN_BCSID, + response.getResult()); + } + + } + @Test public void testGetCommittedBlockLength() { KeyValueContainer container = getMockUnhealthyContainer(); @@ -121,6 +152,23 @@ public void testReadChunk() { assertEquals(UNKNOWN_BCSID, response.getResult()); } + @ParameterizedTest + @MethodSource("getAllClientVersions") + public void testReadChunkWithReplicaIndexMismatch(ClientVersion clientVersion, int replicaIndex) { + KeyValueContainer container = getMockContainerWithReplicaIndex(replicaIndex); + KeyValueHandler handler = getDummyHandler(); + for (int rid = 0; rid <= 5; rid++) { + ContainerProtos.ContainerCommandResponseProto response = + handler.handleReadChunk(getDummyCommandRequestProto(clientVersion, ContainerProtos.Type.ReadChunk, rid), + container, null); + assertEquals((replicaIndex > 0 && rid != replicaIndex && + clientVersion.toProtoValue() >= ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST.toProtoValue()) ? + ContainerProtos.Result.CONTAINER_NOT_FOUND : UNKNOWN_BCSID, + response.getResult()); + } + + } + @Test public void testGetSmallFile() { KeyValueContainer container = getMockUnhealthyContainer(); @@ -204,4 +252,15 @@ private KeyValueContainer getMockUnhealthyContainer() { .ContainerDataProto.newBuilder().setContainerID(1).build()); return new KeyValueContainer(containerData, new OzoneConfiguration()); } + + private KeyValueContainer getMockContainerWithReplicaIndex(int replicaIndex) { + KeyValueContainerData containerData = mock(KeyValueContainerData.class); + when(containerData.getState()).thenReturn( + ContainerProtos.ContainerDataProto.State.CLOSED); + when(containerData.getBlockCommitSequenceId()).thenReturn(100L); + when(containerData.getReplicaIndex()).thenReturn(replicaIndex); + when(containerData.getProtoBufMessage()).thenReturn(ContainerProtos + .ContainerDataProto.newBuilder().setContainerID(1).build()); + return new KeyValueContainer(containerData, new OzoneConfiguration()); + } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java index 3da23688418..13ba5716987 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ECFileChecksumHelper.java @@ -186,13 +186,10 @@ private List getChunkInfos(OmKeyLocationInfo LOG.debug("Initializing BlockInputStream for get key to access {}", blockID.getContainerID()); } - xceiverClientSpi = - getXceiverClientFactory().acquireClientForReadData(pipeline); + xceiverClientSpi = getXceiverClientFactory().acquireClientForReadData(pipeline); - ContainerProtos.DatanodeBlockID datanodeBlockID = blockID - .getDatanodeBlockIDProtobuf(); ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClientSpi, datanodeBlockID, token); + .getBlock(xceiverClientSpi, blockID, token, pipeline.getReplicaIndexes()); chunks = response.getBlockData().getChunksList(); } finally { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java index 09f9c7d037e..016121ce1a9 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/checksum/ReplicatedFileChecksumHelper.java @@ -149,13 +149,9 @@ protected List getChunkInfos( LOG.debug("Initializing BlockInputStream for get key to access {}", blockID.getContainerID()); } - xceiverClientSpi = - getXceiverClientFactory().acquireClientForReadData(pipeline); - - ContainerProtos.DatanodeBlockID datanodeBlockID = blockID - .getDatanodeBlockIDProtobuf(); + xceiverClientSpi = getXceiverClientFactory().acquireClientForReadData(pipeline); ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls - .getBlock(xceiverClientSpi, datanodeBlockID, token); + .getBlock(xceiverClientSpi, blockID, token, pipeline.getReplicaIndexes()); chunks = response.getBlockData().getChunksList(); } finally { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java index 241754a57f1..2cf2ab0cf9c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java @@ -37,7 +37,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -248,8 +247,7 @@ Collection getFailedServers() { @VisibleForTesting Pipeline createSingleECBlockPipeline(Pipeline ecPipeline, DatanodeDetails node, int replicaIndex) { - Map indiciesForSinglePipeline = new HashMap<>(); - indiciesForSinglePipeline.put(node, replicaIndex); + Map indiciesForSinglePipeline = Collections.singletonMap(node, replicaIndex); return Pipeline.newBuilder() .setId(ecPipeline.getId()) .setReplicationConfig(ecPipeline.getReplicationConfig()) diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 2d40841ee49..76fa1e394f6 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -61,7 +61,7 @@ private static List createStreams( XceiverClientFactory xceiverClientFactory, Function retryFunction, BlockInputStreamFactory blockStreamFactory, - OzoneClientConfig config) { + OzoneClientConfig config) throws IOException { List partStreams = new ArrayList<>(); for (OmKeyLocationInfo omKeyLocationInfo : blockInfos) { if (LOG.isDebugEnabled()) { @@ -121,7 +121,7 @@ private static LengthInputStream getFromOmKeyInfo( Function retryFunction, BlockInputStreamFactory blockStreamFactory, List locationInfos, - OzoneClientConfig config) { + OzoneClientConfig config) throws IOException { List streams = createStreams(keyInfo, locationInfos, xceiverClientFactory, retryFunction, blockStreamFactory, config); @@ -137,7 +137,7 @@ public static LengthInputStream getFromOmKeyInfo(OmKeyInfo keyInfo, XceiverClientFactory xceiverClientFactory, Function retryFunction, BlockInputStreamFactory blockStreamFactory, - OzoneClientConfig config) { + OzoneClientConfig config) throws IOException { List keyLocationInfos = keyInfo .getLatestVersionLocations().getBlocksLatestVersionOnly(); @@ -150,7 +150,7 @@ public static List getStreamsFromKeyInfo(OmKeyInfo keyInfo, XceiverClientFactory xceiverClientFactory, Function retryFunction, BlockInputStreamFactory blockStreamFactory, - OzoneClientConfig config) { + OzoneClientConfig config) throws IOException { List keyLocationInfos = keyInfo .getLatestVersionLocations().getBlocksLatestVersionOnly(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java index 99095f55b00..405a92dc33b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java @@ -238,11 +238,11 @@ public XceiverClientReply sendCommandAsync( private void invokeXceiverClientGetBlock(XceiverClientSpi client) throws IOException { ContainerProtocolCalls.getBlock(client, - ContainerProtos.DatanodeBlockID.newBuilder() + BlockID.getFromProtobuf(ContainerProtos.DatanodeBlockID.newBuilder() .setContainerID(1) .setLocalID(1) .setBlockCommitSequenceId(1) - .build(), null); + .build()), null, client.getPipeline().getReplicaIndexes()); } private void invokeXceiverClientReadChunk(XceiverClientSpi client) @@ -259,7 +259,7 @@ private void invokeXceiverClientReadChunk(XceiverClientSpi client) .setLen(-1) .setOffset(0) .build(), - bid, + bid.getDatanodeBlockIDProtobuf(), null, null); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java index 0236eb72ba1..c274d8fea30 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java @@ -453,10 +453,10 @@ public void testCreateRecoveryContainer() throws Exception { .generateToken(ANY_USER, container.containerID()); scm.getContainerManager().getContainerStateManager() .addContainer(container.getProtobuf()); - + int replicaIndex = 4; XceiverClientSpi dnClient = xceiverClientManager.acquireClient( createSingleNodePipeline(newPipeline, newPipeline.getNodes().get(0), - 2)); + replicaIndex)); try { // To create the actual situation, container would have been in closed // state at SCM. @@ -471,7 +471,7 @@ public void testCreateRecoveryContainer() throws Exception { String encodedToken = cToken.encodeToUrlString(); ContainerProtocolCalls.createRecoveringContainer(dnClient, container.containerID().getProtobuf().getId(), - encodedToken, 4); + encodedToken, replicaIndex); BlockID blockID = ContainerTestHelper .getTestBlockID(container.containerID().getProtobuf().getId()); @@ -512,7 +512,8 @@ public void testCreateRecoveryContainer() throws Exception { readContainerResponseProto.getContainerData().getState()); ContainerProtos.ReadChunkResponseProto readChunkResponseProto = ContainerProtocolCalls.readChunk(dnClient, - writeChunkRequest.getWriteChunk().getChunkData(), blockID, null, + writeChunkRequest.getWriteChunk().getChunkData(), + blockID.getDatanodeBlockIDProtobufBuilder().setReplicaIndex(replicaIndex).build(), null, blockToken); ByteBuffer[] readOnlyByteBuffersArray = BufferUtils .getReadOnlyByteBuffersArray( diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java index 7a1fdf2c595..c561dd5cc94 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java @@ -21,7 +21,9 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Arrays.asList; import static java.util.Collections.emptyMap; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType.KeyValueContainer; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; @@ -31,38 +33,64 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.any; import java.io.IOException; import java.io.OutputStream; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.function.Supplier; +import com.google.common.base.Functions; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackAware; import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; +import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.hadoop.ozone.client.io.OzoneInputStream; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import org.slf4j.event.Level; /** @@ -173,6 +201,22 @@ private void createTestData(OzoneClient client) throws IOException { } } + private byte[] createTestData(OzoneClient client, int size) throws IOException { + ObjectStore objectStore = client.getObjectStore(); + objectStore.createVolume(VOLUME); + OzoneVolume volume = objectStore.getVolume(VOLUME); + volume.createBucket(BUCKET); + OzoneBucket bucket = volume.getBucket(BUCKET); + try (OutputStream out = bucket.createKey(KEY, 0, new ECReplicationConfig("RS-3-2-1k"), + new HashMap<>())) { + byte[] b = new byte[size]; + Random random = new Random(); + random.nextBytes(b); + out.write(b); + return b; + } + } + private static List lookupKey(MiniOzoneCluster cluster) throws IOException { OmKeyArgs keyArgs = new OmKeyArgs.Builder() @@ -186,4 +230,155 @@ private static List lookupKey(MiniOzoneCluster cluster) return locations.getLocationList(); } + private static OmKeyLocationInfo lookupKeyFirstLocation(MiniOzoneCluster cluster) + throws IOException { + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(VOLUME) + .setBucketName(BUCKET) + .setKeyName(KEY) + .build(); + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + OmKeyLocationInfoGroup locations = keyInfo.getLatestVersionLocations(); + Assertions.assertNotNull(locations); + return locations.getLocationList().get(0); + } + + + public void assertState(MiniOzoneCluster cluster, Map expectedReplicaMap) + throws IOException { + OmKeyLocationInfo keyLocation = lookupKeyFirstLocation(cluster); + Map replicaMap = + keyLocation.getPipeline().getNodes().stream().collect(Collectors.toMap( + dn -> keyLocation.getPipeline().getReplicaIndex(dn), Functions.identity())); + Assertions.assertEquals(expectedReplicaMap, replicaMap); + } + + private OzoneInputStream createInputStream(OzoneClient client) throws IOException { + ObjectStore objectStore = client.getObjectStore(); + OzoneVolume volume = objectStore.getVolume(VOLUME); + OzoneBucket bucket = volume.getBucket(BUCKET); + return bucket.readKey(KEY); + } + + private void mockContainerProtocolCalls(final MockedStatic mockedContainerProtocolCalls, + final Map failedReadChunkCountMap) { + mockedContainerProtocolCalls.when(() -> ContainerProtocolCalls.readChunk(any(), any(), any(), anyList(), any())) + .thenAnswer(invocation -> { + int replicaIndex = ((ContainerProtos.DatanodeBlockID) invocation.getArgument(2)).getReplicaIndex(); + try { + return invocation.callRealMethod(); + } catch (Throwable e) { + failedReadChunkCountMap.compute(replicaIndex, + (replicaIdx, totalCount) -> totalCount == null ? 1 : (totalCount + 1)); + throw e; + } + }); + } + + + private static void deleteContainer(MiniOzoneCluster cluster, DatanodeDetails dn, long containerId) + throws IOException { + OzoneContainer container = cluster.getHddsDatanode(dn).getDatanodeStateMachine().getContainer(); + Container containerData = container.getContainerSet().getContainer(containerId); + if (containerData != null) { + container.getDispatcher().getHandler(KeyValueContainer).deleteContainer(containerData, true); + } + cluster.getHddsDatanode(dn).getDatanodeStateMachine().triggerHeartbeat(); + } + + + @Test + public void testECContainerReplication() throws Exception { + OzoneConfiguration conf = createConfiguration(false); + final Map failedReadChunkCountMap = new ConcurrentHashMap<>(); + // Overiding Config to support 1k Chunk size + conf.set("ozone.replication.allowed-configs", "(^((STANDALONE|RATIS)/(ONE|THREE))|(EC/(3-2|6-3|10-4)-" + + "(512|1024|2048|4096|1)k)$)"); + conf.set(OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY, SCMContainerPlacementRackScatter.class.getCanonicalName()); + try (MockedStatic mockedContainerProtocolCalls = + Mockito.mockStatic(ContainerProtocolCalls.class, Mockito.CALLS_REAL_METHODS);) { + mockContainerProtocolCalls(mockedContainerProtocolCalls, failedReadChunkCountMap); + // Creating Cluster with 5 Nodes + try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build()) { + cluster.waitForClusterToBeReady(); + try (OzoneClient client = OzoneClientFactory.getRpcClient(conf)) { + Set allNodes = + cluster.getHddsDatanodes().stream().map(HddsDatanodeService::getDatanodeDetails).collect( + Collectors.toSet()); + List initialNodesWithData = new ArrayList<>(); + // Keeping 5 DNs and stopping the 6th Node here it is kept in the var extraNodes + for (DatanodeDetails dn : allNodes) { + if (initialNodesWithData.size() < 5) { + initialNodesWithData.add(dn); + } else { + cluster.shutdownHddsDatanode(dn); + } + } + + // Creating 2 stripes with Chunk Size 1k + int size = 6 * 1024; + byte[] originalData = createTestData(client, size); + + // Getting latest location of the key + final OmKeyLocationInfo keyLocation = lookupKeyFirstLocation(cluster); + long containerID = keyLocation.getContainerID(); + waitForContainerClose(cluster, containerID); + + // Forming Replica Index Map + Map replicaIndexMap = + initialNodesWithData.stream().map(dn -> new Object[]{dn, keyLocation.getPipeline().getReplicaIndex(dn)}) + .collect( + Collectors.toMap(x -> (Integer) x[1], x -> (DatanodeDetails) x[0])); + + //Reading through file and comparing with input data. + byte[] readData = new byte[size]; + try (OzoneInputStream inputStream = createInputStream(client)) { + inputStream.read(readData); + Assertions.assertArrayEquals(readData, originalData); + } + Assertions.assertEquals(0, failedReadChunkCountMap.size()); + //Opening a new stream before we make changes to the blocks. + try (OzoneInputStream inputStream = createInputStream(client)) { + int firstReadLen = 1024 * 3; + Arrays.fill(readData, (byte) 0); + //Reading first stripe. + inputStream.read(readData, 0, firstReadLen); + Assertions.assertEquals(0, failedReadChunkCountMap.size()); + //Checking the initial state as per the latest location. + assertState(cluster, ImmutableMap.of(1, replicaIndexMap.get(1), 2, replicaIndexMap.get(2), + 3, replicaIndexMap.get(3), 4, replicaIndexMap.get(4), 5, replicaIndexMap.get(5))); + + // Stopping replication manager + cluster.getStorageContainerManager().getReplicationManager().stop(); + // Deleting the container from DN1 & DN3 + deleteContainer(cluster, replicaIndexMap.get(1), containerID); + deleteContainer(cluster, replicaIndexMap.get(3), containerID); + // Waiting for replica count of container to come down to 3. + waitForReplicaCount(containerID, 3, cluster); + // Shutting down DN1 + cluster.shutdownHddsDatanode(replicaIndexMap.get(1)); + // Starting replication manager which should process under replication & write replica 1 to DN3. + cluster.getStorageContainerManager().getReplicationManager().start(); + waitForReplicaCount(containerID, 4, cluster); + // Asserting Replica 1 has been written to DN3. + assertState(cluster, ImmutableMap.of(1, replicaIndexMap.get(3), 2, replicaIndexMap.get(2), + 4, replicaIndexMap.get(4), 5, replicaIndexMap.get(5))); + // Starting DN1. + cluster.restartHddsDatanode(replicaIndexMap.get(1), false); + // Waiting for underreplication to get resolved. + waitForReplicaCount(containerID, 5, cluster); + // Asserting Replica 1 & Replica 3 has been swapped b/w DN1 & DN3. + assertState(cluster, ImmutableMap.of(1, replicaIndexMap.get(3), 2, replicaIndexMap.get(2), + 3, replicaIndexMap.get(1), 4, replicaIndexMap.get(4), 5, replicaIndexMap.get(5))); + // Reading the Stripe 2 from the pre initialized inputStream + inputStream.read(readData, firstReadLen, size - firstReadLen); + // Asserting there was a failure in the first read chunk. + Assertions.assertEquals(ImmutableMap.of(1, 1, 3, 1), failedReadChunkCountMap); + Assertions.assertArrayEquals(readData, originalData); + } + } + } + } + } + } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java index e0d5ef4084d..9e2077593ce 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -81,7 +82,7 @@ public void testErrorReadGroupInputStream() throws Exception { } @Nonnull - private List createInputStreams(String dataString) { + private List createInputStreams(String dataString) throws IOException { byte[] buf = dataString.getBytes(UTF_8); List streams = new ArrayList<>(); int offset = 0; @@ -93,7 +94,7 @@ private List createInputStreams(String dataString) { return streams; } - private BlockInputStream createStream(byte[] buf, int offset) { + private BlockInputStream createStream(byte[] buf, int offset) throws IOException { OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); clientConfig.setChecksumVerify(true); return new BlockInputStream(null, 100, null, null, null,