Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10983. EC Key read corruption when the replica index of container in DN mismatches #6779

Merged
merged 23 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a1d5c4a
HDDS-10983. EC Key read corruption when the replica index of containe…
swamirishi Jun 6, 2024
8815940
HDDS-10983. Add Testcase
swamirishi Jun 6, 2024
4287270
HDDS-10983. Make the read api compatible with older clients
swamirishi Jun 7, 2024
2bb1b9d
HDDS-10983. Remove stacktrace
swamirishi Jun 7, 2024
5886cf4
HDDS-10983. Fix checkstyle
swamirishi Jun 7, 2024
cfc7415
HDDS-10983. Address review comments
swamirishi Jun 7, 2024
8edc907
HDDS-10983. Fix testcase
swamirishi Jun 7, 2024
1476d59
HDDS-10983. Add client version test
swamirishi Jun 8, 2024
437594d
HDDS-10983. Fix test cases
swamirishi Jun 9, 2024
65994f3
HDDS-10983. Fix issues
swamirishi Jun 10, 2024
731107c
HDDS-10983. Fix testcases
swamirishi Jun 10, 2024
5857567
HDDS-10983. Fix checkstyle
swamirishi Jun 10, 2024
dd11beb
HDDS-10983. Fix Acceptance test
swamirishi Jun 11, 2024
4c4e2c7
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 11, 2024
0d769da
HDDS-10983. Fix checkstyle
swamirishi Jun 11, 2024
45279a3
HDDS-10983. change pipeline supplier usage
swamirishi Jun 13, 2024
58b089d
HDDS-10983. Address review comments and simplify testcase
swamirishi Jun 14, 2024
e4c0d67
HDDS-10983. Convert to mapToInt
swamirishi Jun 14, 2024
da98011
HDDS-11013. Merge upstream master
swamirishi Jun 17, 2024
5db065f
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi Jun 17, 2024
b0764e4
HDDS-10983. Merge master
swamirishi Jun 17, 2024
3ae6768
HDDS-10983. Address reveiw comments
swamirishi Jun 19, 2024
040d6d8
HDDS-10983. Move replica index validation outside block manager
swamirishi Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdds.HddsUtils.processForDebug;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.getContainerCommandRequestProtoBuilder;

/**
* {@link XceiverClientSpi} implementation, the standalone client.
Expand Down Expand Up @@ -337,8 +338,7 @@ private XceiverClientReply sendCommandWithTraceIDAndRetry(

return TracingUtil.executeInNewSpan(spanName,
() -> {
ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
ContainerCommandRequestProto finalPayload = getContainerCommandRequestProtoBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan()).build();
return sendCommandWithRetry(finalPayload, validators);
});
Expand Down Expand Up @@ -450,7 +450,7 @@ private XceiverClientReply sendCommandWithRetry(
processForDebug(request), pipeline);
} else {
LOG.error(message + " on the pipeline {}.",
request.getCmdType(), pipeline);
request.getCmdType(), pipeline, new RuntimeException());
}
throw ioException;
}
Expand Down Expand Up @@ -490,8 +490,7 @@ public XceiverClientReply sendCommandAsync(

try (Scope ignored = GlobalTracer.get().activateSpan(span)) {

ContainerCommandRequestProto finalPayload =
ContainerCommandRequestProto.newBuilder(request)
ContainerCommandRequestProto finalPayload = getContainerCommandRequestProtoBuilder(request)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
XceiverClientReply asyncReply =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.getContainerCommandRequestProtoBuilder;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;

/**
Expand Down Expand Up @@ -204,7 +205,7 @@ private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
// it or remove it completely if possible
String id = pipeline.getFirstNode().getUuidString();
ContainerProtos.ContainerCommandRequestProto.Builder builder =
ContainerProtos.ContainerCommandRequestProto.newBuilder()
getContainerCommandRequestProtoBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The patch is huge (122K), hard to review. Size could be reduced by prefactoring:

  • introduce this factory method
  • replace all calls of newBuilder()

in a separate patch, without any functional changes, before the fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will raise a patch for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adoroszlai @sodonnel I have raised another patch since this patch is becomning too big to review #6812

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other PR ,merged now.

.setCmdType(ContainerProtos.Type.StreamInit)
.setContainerID(blockID.get().getContainerID())
.setDatanodeUuid(id).setWriteChunk(writeChunkRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.client.BlockID;
Expand Down Expand Up @@ -116,7 +118,7 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
this.blockID = blockId;
this.length = blockLen;
setPipeline(pipeline);
Expand All @@ -133,7 +135,7 @@ public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverClientFactory,
OzoneClientConfig config
) {
) throws IOException {
this(blockId, blockLen, pipeline, token,
xceiverClientFactory, null, config);
}
Expand Down Expand Up @@ -244,33 +246,28 @@ protected List<ChunkInfo> getChunkInfoList() throws IOException {

@VisibleForTesting
protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
final Pipeline pipeline = xceiverClient.getPipeline();

Pipeline pipeline = pipelineRef.get();
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing BlockInputStream for get key to access {}",
blockID.getContainerID());
}

DatanodeBlockID.Builder blkIDBuilder =
DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
.setLocalID(blockID.getLocalID())
.setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());

int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
if (replicaIndex > 0) {
blkIDBuilder.setReplicaIndex(replicaIndex);
LOG.debug("Initializing BlockInputStream for get key to access {} with pipeline {}.",
blockID.getContainerID(), pipeline);
}

GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());
xceiverClient, VALIDATORS, blockID, tokenRef.get(), pipeline.getReplicaIndexes());

return response.getBlockData().getChunksList();
}

private void setPipeline(Pipeline pipeline) {
private void setPipeline(Pipeline pipeline) throws IOException {
if (pipeline == null) {
return;
}
Set<Integer> replicaIndexes =
pipeline.getNodes().stream().map(pipeline::getReplicaIndex).collect(Collectors.toSet());
if (replicaIndexes.size() > 1) {
swamirishi marked this conversation as resolved.
Show resolved Hide resolved
throw new IOException(String.format("Pipeline: %s has nodes containing different replica indexes.",
pipeline));
}

// irrespective of the container state, we will always read via Standalone
// protocol.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
Expand Down Expand Up @@ -60,6 +62,7 @@ public class ChunkInputStream extends InputStream
private final ChunkInfo chunkInfo;
private final long length;
private final BlockID blockID;
private ContainerProtos.DatanodeBlockID datanodeBlockID;
private final XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private final Supplier<Pipeline> pipelineSupplier;
Expand Down Expand Up @@ -290,13 +293,27 @@ protected synchronized void releaseClient() {
}
}

/**
* Updates DatanodeBlockId which based on blockId.
*/
private void updateDatanodeBlockId(Pipeline pipeline) throws IOException {
DatanodeDetails closestNode = pipeline.getClosestNode();
int replicaIdx = pipeline.getReplicaIndex(closestNode);
ContainerProtos.DatanodeBlockID.Builder builder = blockID.getDatanodeBlockIDProtobufBuilder();
if (replicaIdx > 0) {
builder.setReplicaIndex(replicaIdx);
}
datanodeBlockID = builder.build();
}

/**
* Acquire new client if previous one was released.
*/
protected synchronized void acquireClient() throws IOException {
if (xceiverClientFactory != null && xceiverClient == null) {
xceiverClient = xceiverClientFactory.acquireClientForReadData(
pipelineSupplier.get());
Pipeline pipeline = pipelineSupplier.get();
xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
updateDatanodeBlockId(pipeline);
}
}

Expand Down Expand Up @@ -422,8 +439,8 @@ protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
throws IOException {

ReadChunkResponseProto readChunkResponse =
ContainerProtocolCalls.readChunk(xceiverClient,
readChunkInfo, blockID, validators, tokenSupplier.get());
ContainerProtocolCalls.readChunk(xceiverClient, readChunkInfo, datanodeBlockID, validators,
tokenSupplier.get());

if (readChunkResponse.hasData()) {
return readChunkResponse.getData().asReadOnlyByteBufferList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;

import java.io.IOException;
import java.util.function.Function;

/**
Expand All @@ -52,6 +53,6 @@ BlockExtendedInputStream create(ReplicationConfig repConfig,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config);
OzoneClientConfig config) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.security.token.Token;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
Expand Down Expand Up @@ -80,7 +81,7 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig,
Token<OzoneBlockTokenIdentifier> token,
XceiverClientFactory xceiverFactory,
Function<BlockID, BlockLocationInfo> refreshFunction,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
blockInfo, xceiverFactory, refreshFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected int currentStreamIndex() {
* stream if it has not been opened already.
* @return BlockInput stream to read from.
*/
protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
protected BlockExtendedInputStream getOrOpenStream(int locationIndex) throws IOException {
BlockExtendedInputStream stream = blockStreams[locationIndex];
if (stream == null) {
// To read an EC block, we create a STANDALONE pipeline that contains the
Expand All @@ -176,8 +176,8 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Arrays.asList(dataLocation))
.setId(PipelineID.valueOf(dataLocation.getUuid())).setReplicaIndexes(
ImmutableMap.of(dataLocation, locationIndex + 1))
.setId(PipelineID.valueOf(dataLocation.getUuid()))
.setReplicaIndexes(ImmutableMap.of(dataLocation, locationIndex + 1))
.setState(Pipeline.PipelineState.CLOSED)
.build();

Expand Down Expand Up @@ -228,6 +228,7 @@ protected Function<BlockID, BlockLocationInfo> ecPipelineRefreshFunction(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Collections.singletonList(curIndexNode))
.setId(PipelineID.randomId())
.setReplicaIndexes(Collections.singletonMap(curIndexNode, replicaIndex))
.setState(Pipeline.PipelineState.CLOSED)
.build();
blockLocationInfo.setPipeline(pipeline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class DummyBlockInputStream extends BlockInputStream {
Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunks,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
super(blockId, blockLen, pipeline, token,
xceiverClientManager, refreshFunction, config);
this.chunkDataMap = chunks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ final class DummyBlockInputStreamWithRetry
List<ChunkInfo> chunkList,
Map<String, byte[]> chunkMap,
AtomicBoolean isRerfreshed, IOException ioException,
OzoneClientConfig config) {
OzoneClientConfig config) throws IOException {
super(blockId, blockLen, pipeline, token,
xceiverClientManager, blockID -> {
isRerfreshed.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ private static ChunkInputStream throwingChunkInputStream(IOException ex,
}

private BlockInputStream createSubject(BlockID blockID, Pipeline pipeline,
ChunkInputStream stream) {
ChunkInputStream stream) throws IOException {
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(false);
return new DummyBlockInputStream(blockID, blockSize, pipeline, null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.mockito.ArgumentMatchers.any;

/**
* Tests for BlockInputStreamFactoryImpl.
Expand All @@ -48,14 +51,16 @@ public class TestBlockInputStreamFactoryImpl {
private OzoneConfiguration conf = new OzoneConfiguration();

@Test
public void testNonECGivesBlockInputStream() {
public void testNonECGivesBlockInputStream() throws IOException {
BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
ReplicationConfig repConfig =
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);

BlockLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3,
1024 * 1024 * 10);

Pipeline pipeline = Mockito.spy(blockInfo.getPipeline());
blockInfo.setPipeline(pipeline);
Mockito.when(pipeline.getReplicaIndex(any(DatanodeDetails.class))).thenReturn(1);
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(true);
BlockExtendedInputStream stream =
Expand All @@ -68,7 +73,7 @@ public void testNonECGivesBlockInputStream() {
}

@Test
public void testECGivesECBlockInputStream() {
public void testECGivesECBlockInputStream() throws IOException {
BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
ReplicationConfig repConfig =
new ECReplicationConfig(3, 2);
Expand Down
Loading