Skip to content

Commit

Permalink
Todo reconstructor
Browse files Browse the repository at this point in the history
  • Loading branch information
devabhishekpal committed Nov 14, 2024
1 parent 5516cce commit db7b00d
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
this.blockInputStreamFactory, byteBufferPool,
this.ecReconstructReadExecutor,
clientConfig)) {
ecValidator.setBlockLength(blockLocationInfo.getLength());

ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
Expand All @@ -294,8 +295,10 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
}

if (toReconstructIndexes.size() > 0) {
sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
.collect(Collectors.toSet()));
Set<Integer> recoveryIndexes = toReconstructIndexes.stream().map(i -> (i - 1))
.collect(Collectors.toSet());
sis.setRecoveryIndexes(recoveryIndexes);
ecValidator.setReconstructionIndexes(recoveryIndexes);
long length = safeBlockGroupLength;
while (length > 0) {
int readLen;
Expand Down Expand Up @@ -332,7 +335,6 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
future = targetBlockStreams[i].write(bufs[i]);
checkFailures(targetBlockStreams[i], future);
}
ecValidator.validateBuffer(bufs[i], targetBlockStreams[i], i);
bufs[i].clear();
}
length -= readLen;
Expand All @@ -341,6 +343,7 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
List<ECBlockOutputStream> allStreams = new ArrayList<>(Arrays.asList(targetBlockStreams));
allStreams.addAll(Arrays.asList(emptyBlockStreams));
for (ECBlockOutputStream targetStream : allStreams) {
ecValidator.validateBuffer(targetStream);
targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup);
checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,100 @@
package org.apache.hadoop.ozone.container.ec.reconstruction;

import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;

public class ECValidator {

private static final Logger LOG =
LoggerFactory.getLogger(ECValidator.class);
private Checksum checksum = null;
private final boolean isValidationEnabled;
private Collection<Integer> reconstructionIndexes;
private final int parityCount;
private long blockLength;
private final ECReplicationConfig ecReplicationConfig;

ECValidator(OzoneClientConfig config) {
this.checksum = new Checksum(config.getChecksumType(), config.getBytesPerChecksum());
ECValidator(OzoneClientConfig config, ECReplicationConfig ecReplConfig) {
// We fetch the configuration value beforehand to avoid re-fetching on every validation call
isValidationEnabled = config.getEcReconstructionValidation();
ecReplicationConfig = ecReplConfig;
parityCount = ecReplConfig.getParity();
}

public void setReconstructionIndexes(Collection<Integer> reconstructionIndexes) {
this.reconstructionIndexes = reconstructionIndexes;
}

public void setBlockLength(long blockLength) {
this.blockLength = blockLength;
}

private boolean validateChecksumInStripe(ContainerProtos.ChecksumData checksumData, ByteString stripeChecksum)
throws OzoneChecksumException{

int bytesPerChecksum = checksumData.getBytesPerChecksum();
ByteString checksum = stripeChecksum.substring();
}

private BlockData getChecksumBlockData(BlockData[] blockDataGroup) {
BlockData checksumBlockData = null;
// Reverse traversal as all parity bits will have checmsumBytes
for (int i = blockDataGroup.length - 1; i >= 0; i--) {
BlockData blockData = blockDataGroup[i];
if (null == blockData) {
continue;
}

List<ContainerProtos.ChunkInfo> chunks = blockData.getChunks();
if (null != chunks && !(chunks.isEmpty())) {
if (chunks.get(0).hasStripeChecksum()) {
checksumBlockData = blockData;
break;
}
}
}

return checksumBlockData;
}

/**
* Helper function to validate the checksum between recreated data and
* @param buf A {@link ByteBuffer} instance that stores the chunk
* @param ecBlockOutputStream A {@link ECBlockOutputStream} instance that stores
* the reconstructed index ECBlockOutputStream
* @param idx Used to store the index at which data was recreated
* @throws OzoneChecksumException if the recreated checksum and the block checksum doesn't match
*/
public void validateBuffer(ByteBuffer buf, ECBlockOutputStream ecBlockOutputStream, int idx)
public void validateChecksum(ECBlockOutputStream ecBlockOutputStream, BlockData[] blockDataGroup)
throws OzoneChecksumException{
if (isValidationEnabled) {
try (ChunkBuffer chunk = ChunkBuffer.wrap(buf)) {
//Checksum will be stored in the 1st chunk and parity chunks
ContainerProtos.ChecksumData stripeChecksum = ecBlockOutputStream.getContainerBlockData()
.getChunks(0).getChecksumData();
ContainerProtos.ChecksumData chunkChecksum = checksum.computeChecksum(chunk).getProtoBufMessage();
if (stripeChecksum.getChecksums(idx) != chunkChecksum.getChecksums(0)) {
LOG.info("Checksum mismatched between recreated chunk and re-created chunk");
throw new OzoneChecksumException("Inconsistent checksum for re-created chunk and original chunk");
}

//Checksum will be stored in the 1st chunk and parity chunks
List<ContainerProtos.ChunkInfo> recreatedChunks = ecBlockOutputStream.getContainerBlockData().getChunksList();
BlockData checksumBlockData = getChecksumBlockData(blockDataGroup);
if (null == checksumBlockData) {
throw new OzoneChecksumException("Could not find checksum data in any index for blockDataGroup while validating");
}
List<ContainerProtos.ChunkInfo> checksumBlockChunks = checksumBlockData.getChunks();

for (int i = 0; i < recreatedChunks.size(); i++) {
validateChecksumInStripe(recreatedChunks.get(i).getChecksumData(), checksumBlockChunks.get(i).getStripeChecksum());
}


} else {
LOG.debug("Checksum validation was disabled, skipping check");
}
Expand Down

0 comments on commit db7b00d

Please sign in to comment.