Skip to content

Commit

Permalink
Merge branch 'apache:master' into HDDS-9171
Browse files Browse the repository at this point in the history
  • Loading branch information
devabhishekpal authored Aug 31, 2023
2 parents 510eac6 + 1d79d07 commit 690d4da
Show file tree
Hide file tree
Showing 14 changed files with 498 additions and 346 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -198,6 +199,16 @@ public synchronized void addFailedDatanodes(Collection<DatanodeDetails> dns) {
LOG.debug("{}: set failed indexes {}", this, failedDataIndexes);
}

/**
* Returns the set of failed indexes. This will be empty if no errors were
* encountered reading any of the block indexes, and no failed nodes were
* added via {@link #addFailedDatanodes(Collection)}.
* The returned set is a copy of the internal set, so it can be modified.
*/
public synchronized Set<Integer> getFailedIndexes() {
return new HashSet<>(failedDataIndexes);
}

/**
* Set the EC indexes that should be recovered by
* {@link #recoverChunks(ByteBuffer[])}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.utils.db;

import javax.annotation.Nonnull;
import java.util.function.IntFunction;

/**
* Codec to serialize/deserialize {@link Boolean}.
*/
public final class BooleanCodec implements Codec<Boolean> {

private static final byte TRUE = 1;
private static final byte FALSE = 0;
private static final BooleanCodec INSTANCE = new BooleanCodec();

public static BooleanCodec get() {
return INSTANCE;
}

private BooleanCodec() {
// singleton
}

@Override
public boolean supportCodecBuffer() {
return true;
}

@Override
public CodecBuffer toCodecBuffer(Boolean object,
IntFunction<CodecBuffer> allocator) {
return allocator.apply(1).put(TRUE);
}

@Override
public Boolean fromCodecBuffer(@Nonnull CodecBuffer buffer) {
return buffer.asReadOnlyByteBuffer().get() == 1;
}

@Override
public byte[] toPersistedFormat(Boolean object) {
return object ? new byte[]{TRUE} : new byte[]{FALSE};
}

@Override
public Boolean fromPersistedFormat(byte[] rawData) {
if (rawData.length != 1) {
throw new IllegalStateException("Byte Buffer for boolean should be of " +
"length 1 but provided byte array of length " + rawData.length);
}
return rawData[0] == 1;
}

@Override
public Boolean copyObject(Boolean object) {
return object;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,17 @@ public CodecBuffer putLong(long n) {
return this;
}

/**
* Similar to {@link ByteBuffer#put(byte)}.
*
* @return this object.
*/
public CodecBuffer put(byte val) {
assertRefCnt(1);
buf.writeByte(val);
return this;
}

/**
* Similar to {@link ByteBuffer#put(byte[])}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -285,7 +286,31 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
.collect(Collectors.toSet()));
long length = safeBlockGroupLength;
while (length > 0) {
int readLen = sis.recoverChunks(bufs);
int readLen;
try {
readLen = sis.recoverChunks(bufs);
Set<Integer> failedIndexes = sis.getFailedIndexes();
if (!failedIndexes.isEmpty()) {
// There was a problem reading some of the block indexes, but we
// did not get an exception as there must have been spare indexes
// to try and recover from. Therefore we should log out the block
// group details in the same way as for the exception case below.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
}
} catch (IOException e) {
// When we see exceptions here, it could be due to some transient
// issue that causes the block read to fail when reconstructing it,
// but we have seen issues where the containers don't have the
// blocks they appear they should have, or the block chunks are the
// wrong length etc. In order to debug these sort of cases, if we
// get an error, we will log out the details about the block group
// length on each source, along with their chunk list and chunk
// lengths etc.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
throw e;
}
// TODO: can be submitted in parallel
for (int i = 0; i < bufs.length; i++) {
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
Expand All @@ -311,6 +336,43 @@ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
}
}

private void logBlockGroupDetails(BlockLocationInfo blockLocationInfo,
ECReplicationConfig repConfig, BlockData[] blockDataGroup) {
LOG.info("Block group details for {}. " +
"Replication Config {}. Calculated safe length: {}. ",
blockLocationInfo.getBlockID(), repConfig,
blockLocationInfo.getLength());
for (int i = 0; i < blockDataGroup.length; i++) {
BlockData data = blockDataGroup[i];
if (data == null) {
continue;
}
StringBuilder sb = new StringBuilder();
sb.append("Block Data for: ")
.append(data.getBlockID())
.append(" replica Index: ")
.append(i + 1)
.append(" block length: ")
.append(data.getSize())
.append(" block group length: ")
.append(getBlockDataLength(data))
.append(" chunk list: \n");
int cnt = 0;
for (ContainerProtos.ChunkInfo chunkInfo : data.getChunks()) {
if (cnt > 0) {
sb.append("\n");
}
sb.append(" chunkNum: ")
.append(++cnt)
.append(" length: ")
.append(chunkInfo.getLen())
.append(" offset: ")
.append(chunkInfo.getOffset());
}
LOG.info(sb.toString());
}
}

private void checkFailures(ECBlockOutputStream targetBlockStream,
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
currentPutBlkResponseFuture)
Expand Down Expand Up @@ -492,17 +554,24 @@ private long calcEffectiveBlockGroupLen(BlockData[] blockGroup,
continue;
}

String putBlockLenStr = blockGroup[i].getMetadata()
.get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
long putBlockLen = (putBlockLenStr == null) ?
Long.MAX_VALUE :
Long.parseLong(putBlockLenStr);
// Use the min to be conservative
long putBlockLen = getBlockDataLength(blockGroup[i]);
// Use safe length is the minimum of the lengths recorded across the
// stripe
blockGroupLen = Math.min(putBlockLen, blockGroupLen);
}
return blockGroupLen == Long.MAX_VALUE ? 0 : blockGroupLen;
}

private long getBlockDataLength(BlockData blockData) {
String lenStr = blockData.getMetadata()
.get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
// If we don't have the length, then it indicates a problem with the stripe.
// All replica should carry the length, so if it is not there, we return 0,
// which will cause us to set the length of the block to zero and not
// attempt to reconstruct it.
return (lenStr == null) ? 0 : Long.parseLong(lenStr);
}

public ECReconstructionMetrics getECReconstructionMetrics() {
return this.metrics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public static Builder newBuilder() {
.addCodec(String.class, StringCodec.get())
.addCodec(Long.class, LongCodec.get())
.addCodec(Integer.class, IntegerCodec.get())
.addCodec(byte[].class, ByteArrayCodec.get());
.addCodec(byte[].class, ByteArrayCodec.get())
.addCodec(Boolean.class, BooleanCodec.get());
}

private static final class CodecMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ public void testSnapDiffWithNoEffectiveRename() throws Exception {
key1);
Assert.assertEquals(diff.getDiffList(), Arrays.asList(
SnapshotDiffReportOzone.getDiffReportEntry(
SnapshotDiffReport.DiffType.MODIFY, key1)));
SnapshotDiffReport.DiffType.RENAME, key1, key1)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.service.SnapshotDiffCleanupService;
import org.apache.hadoop.ozone.om.snapshot.SnapshotDiffObject;
import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
Expand Down Expand Up @@ -392,7 +391,6 @@ private static CodecRegistry createCodecRegistryForSnapDiff() {
registry.addCodec(SnapshotDiffReportOzone.DiffReportEntry.class,
SnapshotDiffReportOzone.getDiffReportEntryCodec());
registry.addCodec(SnapshotDiffJob.class, SnapshotDiffJob.getCodec());
registry.addCodec(SnapshotDiffObject.class, SnapshotDiffObject.getCodec());
return registry.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public UUID getNextSnapshotId() {
return nextSnapshotId;
}

public boolean hasNextSnapshotId() {
return Objects.nonNull(getNextSnapshotId());
}

public boolean hasPreviousSnapshotId() {
return Objects.nonNull(getPreviousSnapshotId());
}

public UUID getPreviousSnapshotId() {
return previousSnapshotId;
}
Expand Down
Loading

0 comments on commit 690d4da

Please sign in to comment.