Skip to content

Commit

Permalink
HDDS-11475. EC: Verify EC reconstruction correctness on DN
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanbenWang committed Sep 19, 2024
1 parent 70b8dd5 commit 67702f2
Show file tree
Hide file tree
Showing 10 changed files with 564 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ public enum ChecksumCombineMode {
tags = ConfigTag.CLIENT)
private long excludeNodesExpiryTime = 10 * 60 * 1000;

@Config(key = "ec.reconstruct.validation",
defaultValue = "false",
description = "The flag whether to validate that EC reconstruction tasks " +
"reconstruct target containers correctly. When validation fails, " +
"reconstruction tasks will fail.",
tags = ConfigTag.CLIENT)
private boolean ecReconstructValidation = false;

@Config(key = "ec.reconstruct.stripe.read.pool.limit",
defaultValue = "30",
description = "Thread pool max size for parallelly read" +
Expand Down Expand Up @@ -493,6 +501,14 @@ public ChecksumCombineMode getChecksumCombineMode() {
}
}

public void setEcReconstructValidation(boolean ecReconstructValidation) {
this.ecReconstructValidation = ecReconstructValidation;
}

public boolean isEcReconstructValidation() {
return ecReconstructValidation;
}

public void setEcReconstructStripeReadPoolLimit(int poolLimit) {
this.ecReconstructStripeReadPoolLimit = poolLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.ozone.erasurecode.rawcoder.DecodingValidator;
import org.apache.ozone.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil;
import org.apache.ratis.util.Preconditions;
Expand Down Expand Up @@ -143,7 +144,8 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
private final ByteBufferPool byteBufferPool;

private RawErasureDecoder decoder;

private DecodingValidator validator;
private boolean isValidationEnabled = false;
private boolean initialized = false;

private final ExecutorService executor;
Expand Down Expand Up @@ -171,6 +173,7 @@ public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
paddingIndexes = setOfRange(expectedDataBlocks, d);
parityIndexes = setOfRange(d, repConfig.getRequiredNodes());
allIndexes = setOfRange(0, repConfig.getRequiredNodes());
this.isValidationEnabled = config.isEcReconstructValidation();
}

/**
Expand Down Expand Up @@ -231,6 +234,10 @@ private void init() throws InsufficientLocationsException {
if (decoder == null) {
decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
}
if (isValidationEnabled && validator == null) {
LOG.debug("Init decoding validator.");
validator = new DecodingValidator(decoder);
}
if (!hasSufficientLocations()) {
String msg = "There are insufficient datanodes to read the EC block";
LOG.debug("{}: {}", this, msg);
Expand Down Expand Up @@ -690,7 +697,15 @@ private void decodeStripe() throws IOException {
int[] erasedIndexes = missingIndexes.stream()
.mapToInt(Integer::valueOf)
.toArray();
decoder.decode(decoderInputBuffers, erasedIndexes, decoderOutputBuffers);
if (isValidationEnabled) {
markBuffers(decoderInputBuffers);
decoder.decode(decoderInputBuffers, erasedIndexes, decoderOutputBuffers);
resetBuffers(decoderInputBuffers);

validator.validate(decoderInputBuffers, erasedIndexes, decoderOutputBuffers);
} else {
decoder.decode(decoderInputBuffers, erasedIndexes, decoderOutputBuffers);
}
flipInputs();
}

Expand Down Expand Up @@ -826,4 +841,20 @@ private static SortedSet<Integer> setOfRange(
.boxed().collect(toCollection(TreeSet::new));
}

private static void markBuffers(ByteBuffer[] buffers) {
for (ByteBuffer buffer: buffers) {
if (buffer != null) {
buffer.mark();
}
}
}

private static void resetBuffers(ByteBuffer[] buffers) {
for (ByteBuffer buffer: buffers) {
if (buffer != null) {
buffer.reset();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,7 @@ private ECBlockReconstructedStripeInputStream createInputStream(
BlockLocationInfo keyInfo) {
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
clientConfig.setChecksumVerify(true);
clientConfig.setEcReconstructValidation(true);
return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
null, null, streamFactory, bufferPool, ecReconstructExecutor,
clientConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.token.Token;
import org.apache.ozone.erasurecode.rawcoder.InvalidDecodingException;
import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -319,6 +320,9 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
// lengths etc.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
if (e instanceof InvalidDecodingException) {
getECReconstructionMetrics().incrECInvalidReconstructionTasks();
}
throw e;
}
// TODO: can be submitted in parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public final class ECReconstructionMetrics {
private @Metric MutableCounterLong blockGroupReconstructionFailsTotal;
private @Metric MutableCounterLong reconstructionTotal;
private @Metric MutableCounterLong reconstructionFailsTotal;
@Metric("Count of erasure coding invalidated reconstruction tasks")
private MutableCounterLong ecInvalidReconstructionTasks;

private ECReconstructionMetrics() {
}
Expand Down Expand Up @@ -77,4 +79,12 @@ public long getReconstructionTotal() {
public long getBlockGroupReconstructionTotal() {
return blockGroupReconstructionTotal.value();
}

public void incrECInvalidReconstructionTasks() {
ecInvalidReconstructionTasks.incr();
}

public long getECInvalidReconstructionTasks() {
return ecInvalidReconstructionTasks.value();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package org.apache.ozone.erasurecode.rawcoder;


import com.google.common.annotations.VisibleForTesting;
import org.apache.ozone.erasurecode.ECChunk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;

public class DecodingValidator {

private static final Logger LOG =
LoggerFactory.getLogger(DecodingValidator.class.getName());
private final RawErasureDecoder decoder;
private ByteBuffer buffer;
private int[] newValidIndexes;
private int newErasedIndex;

public DecodingValidator(RawErasureDecoder decoder) {
this.decoder = decoder;
}

/**
* Validate outputs decoded from inputs, by decoding an input back from
* the outputs and comparing it with the original one.
*
* For instance, in RS (6, 3), let (d0, d1, d2, d3, d4, d5) be sources
* and (p0, p1, p2) be parities, and assume
* inputs = [d0, null (d1), d2, d3, d4, d5, null (p0), p1, null (p2)];
* erasedIndexes = [1, 6];
* outputs = [d1, p0].
* Then
* 1. Create new inputs, erasedIndexes and outputs for validation so that
* the inputs could contain the decoded outputs, and decode them:
* newInputs = [d1, d2, d3, d4, d5, p0]
* newErasedIndexes = [0]
* newOutputs = [d0']
* 2. Compare d0 and d0'. The comparison will fail with high probability
* when the initial outputs are wrong.
*
* Note that the input buffers' positions must be the ones where data are
* read: If the input buffers have been processed by a decoder, the buffers'
* positions must be reset before being passed into this method.
*
* This method does not change outputs and erasedIndexes.
*
* @param inputs input buffers used for decoding. The buffers' position
* are moved to the end after this method.
* @param erasedIndexes indexes of erased units used for decoding
* @param outputs decoded output buffers, which are ready to be read after
* the call
* @throws IOException
*/
public void validate(ByteBuffer[] inputs, int[] erasedIndexes,
ByteBuffer[] outputs) throws IOException {
markBuffers(outputs);

try {
ByteBuffer validInput = CoderUtil.findFirstValidInput(inputs);
boolean isDirect = validInput.isDirect();
int capacity = validInput.capacity();
int remaining = validInput.remaining();

// Init buffer
if (buffer == null || buffer.isDirect() != isDirect
|| buffer.capacity() < remaining) {
buffer = allocateBuffer(isDirect, capacity);
}
buffer.clear().limit(remaining);

// Create newInputs and newErasedIndex for validation
ByteBuffer[] newInputs = new ByteBuffer[inputs.length];
int count = 0;
for (int i = 0; i < erasedIndexes.length; i++) {
newInputs[erasedIndexes[i]] = outputs[i];
count++;
}
newErasedIndex = -1;
boolean selected = false;
int numValidIndexes = CoderUtil.getValidIndexes(inputs).length;
for (int i = 0; i < newInputs.length; i++) {
if (count == numValidIndexes) {
break;
} else if (!selected && inputs[i] != null) {
newErasedIndex = i;
newInputs[i] = null;
selected = true;
} else if (newInputs[i] == null) {
newInputs[i] = inputs[i];
if (inputs[i] != null) {
count++;
}
}
}

// Keep it for testing
newValidIndexes = CoderUtil.getValidIndexes(newInputs);

decoder.decode(newInputs, new int[]{newErasedIndex},
new ByteBuffer[]{buffer});

if (!buffer.equals(inputs[newErasedIndex])) {
throw new InvalidDecodingException("Failed to validate decoding");
}
LOG.debug("Success to validate decoding.");
} finally {
toLimits(inputs);
resetBuffers(outputs);
}
}

/**
* Validate outputs decoded from inputs, by decoding an input back from
* those outputs and comparing it with the original one.
* @param inputs input buffers used for decoding
* @param erasedIndexes indexes of erased units used for decoding
* @param outputs decoded output buffers
* @throws IOException
*/
public void validate(ECChunk[] inputs, int[] erasedIndexes, ECChunk[] outputs)
throws IOException {
ByteBuffer[] newInputs = CoderUtil.toBuffers(inputs);
ByteBuffer[] newOutputs = CoderUtil.toBuffers(outputs);
validate(newInputs, erasedIndexes, newOutputs);
}

private ByteBuffer allocateBuffer(boolean direct, int capacity) {
if (direct) {
buffer = ByteBuffer.allocateDirect(capacity);
} else {
buffer = ByteBuffer.allocate(capacity);
}
return buffer;
}

private static void markBuffers(ByteBuffer[] buffers) {
for (ByteBuffer buffer: buffers) {
if (buffer != null) {
buffer.mark();
}
}
}

private static void resetBuffers(ByteBuffer[] buffers) {
for (ByteBuffer buffer: buffers) {
if (buffer != null) {
buffer.reset();
}
}
}

private static void toLimits(ByteBuffer[] buffers) {
for (ByteBuffer buffer: buffers) {
if (buffer != null) {
buffer.position(buffer.limit());
}
}
}

@VisibleForTesting
protected int[] getNewValidIndexes() {
return newValidIndexes;
}

@VisibleForTesting
protected int getNewErasedIndex() {
return newErasedIndex;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ozone.erasurecode.rawcoder;


import java.io.IOException;

/**
* Thrown for invalid decoding.
*/

public class InvalidDecodingException
extends IOException {
private static final long serialVersionUID = 0L;

public InvalidDecodingException(String description) {
super(description);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -519,4 +519,16 @@ protected void corruptSomeChunk(ECChunk[] chunks) {
buffer.position(buffer.position() + 1);
}
}

/**
* Pollute some chunk.
* @param chunks
*/
protected void polluteSomeChunk(ECChunk[] chunks) {
int idx = new Random().nextInt(chunks.length);
ByteBuffer buffer = chunks[idx].getBuffer();
buffer.mark();
buffer.put((byte) ((buffer.get(buffer.position()) + 1)));
buffer.reset();
}
}
Loading

0 comments on commit 67702f2

Please sign in to comment.