diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java index 6dde8a744c8..723ac81281d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java @@ -270,6 +270,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, this.blockInputStreamFactory, byteBufferPool, this.ecReconstructReadExecutor, clientConfig)) { + ecValidator.setBlockLength(blockLocationInfo.getLength()); ECBlockOutputStream[] targetBlockStreams = new ECBlockOutputStream[toReconstructIndexes.size()]; @@ -294,8 +295,10 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, } if (toReconstructIndexes.size() > 0) { - sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1)) - .collect(Collectors.toSet())); + Set recoveryIndexes = toReconstructIndexes.stream().map(i -> (i - 1)) + .collect(Collectors.toSet()); + sis.setRecoveryIndexes(recoveryIndexes); + ecValidator.setReconstructionIndexes(recoveryIndexes); long length = safeBlockGroupLength; while (length > 0) { int readLen; @@ -332,7 +335,6 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, future = targetBlockStreams[i].write(bufs[i]); checkFailures(targetBlockStreams[i], future); } - ecValidator.validateBuffer(bufs[i], targetBlockStreams[i], i); bufs[i].clear(); } length -= readLen; @@ -341,6 +343,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo, List allStreams = new ArrayList<>(Arrays.asList(targetBlockStreams)); allStreams.addAll(Arrays.asList(emptyBlockStreams)); for (ECBlockOutputStream targetStream : allStreams) { + ecValidator.validateBuffer(targetStream); targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup); checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture()); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java index 40b6d395186..f3453a84d11 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECValidator.java @@ -1,5 +1,6 @@ package org.apache.hadoop.ozone.container.ec.reconstruction; +import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream; @@ -7,46 +8,93 @@ import org.apache.hadoop.ozone.common.ChecksumData; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.*; +import java.util.stream.Collectors; public class ECValidator { private static final Logger LOG = LoggerFactory.getLogger(ECValidator.class); - private Checksum checksum = null; private final boolean isValidationEnabled; + private Collection reconstructionIndexes; + private final int parityCount; + private long blockLength; + private final ECReplicationConfig ecReplicationConfig; - ECValidator(OzoneClientConfig config) { - this.checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum()); + ECValidator(OzoneClientConfig config, ECReplicationConfig ecReplConfig) { // We fetch the configuration value beforehand to avoid re-fetching on every validation call isValidationEnabled = config.getEcReconstructionValidation(); + ecReplicationConfig = ecReplConfig; + parityCount = ecReplConfig.getParity(); + } + + public void setReconstructionIndexes(Collection reconstructionIndexes) { + this.reconstructionIndexes = reconstructionIndexes; + } + + public void setBlockLength(long blockLength) { + this.blockLength = blockLength; + } + + private boolean validateChecksumInStripe(ContainerProtos.ChecksumData checksumData, ByteString stripeChecksum) + throws OzoneChecksumException{ + + int bytesPerChecksum = checksumData.getBytesPerChecksum(); + ByteString checksum = stripeChecksum.substring(); + } + + private BlockData getChecksumBlockData(BlockData[] blockDataGroup) { + BlockData checksumBlockData = null; + // Reverse traversal as all parity bits will have checmsumBytes + for (int i = blockDataGroup.length - 1; i >= 0; i--) { + BlockData blockData = blockDataGroup[i]; + if (null == blockData) { + continue; + } + + List chunks = blockData.getChunks(); + if (null != chunks && !(chunks.isEmpty())) { + if (chunks.get(0).hasStripeChecksum()) { + checksumBlockData = blockData; + break; + } + } + } + + return checksumBlockData; } /** * Helper function to validate the checksum between recreated data and - * @param buf A {@link ByteBuffer} instance that stores the chunk * @param ecBlockOutputStream A {@link ECBlockOutputStream} instance that stores * the reconstructed index ECBlockOutputStream - * @param idx Used to store the index at which data was recreated * @throws OzoneChecksumException if the recreated checksum and the block checksum doesn't match */ - public void validateBuffer(ByteBuffer buf, ECBlockOutputStream ecBlockOutputStream, int idx) + public void validateChecksum(ECBlockOutputStream ecBlockOutputStream, BlockData[] blockDataGroup) throws OzoneChecksumException{ if (isValidationEnabled) { - try (ChunkBuffer chunk = ChunkBuffer.wrap(buf)) { - //Checksum will be stored in the 1st chunk and parity chunks - ContainerProtos.ChecksumData stripeChecksum = ecBlockOutputStream.getContainerBlockData() - .getChunks(0).getChecksumData(); - ContainerProtos.ChecksumData chunkChecksum = checksum.computeChecksum(chunk).getProtoBufMessage(); - if (stripeChecksum.getChecksums(idx) != chunkChecksum.getChecksums(0)) { - LOG.info("Checksum mismatched between recreated chunk and re-created chunk"); - throw new OzoneChecksumException("Inconsistent checksum for re-created chunk and original chunk"); - } + + //Checksum will be stored in the 1st chunk and parity chunks + List recreatedChunks = ecBlockOutputStream.getContainerBlockData().getChunksList(); + BlockData checksumBlockData = getChecksumBlockData(blockDataGroup); + if (null == checksumBlockData) { + throw new OzoneChecksumException("Could not find checksum data in any index for blockDataGroup while validating"); + } + List checksumBlockChunks = checksumBlockData.getChunks(); + + for (int i = 0; i < recreatedChunks.size(); i++) { + validateChecksumInStripe(recreatedChunks.get(i).getChecksumData(), checksumBlockChunks.get(i).getStripeChecksum()); } + + } else { LOG.debug("Checksum validation was disabled, skipping check"); }