Skip to content

Commit

Permalink
HDDS-11418. leader execution flow (#7211)
Browse files Browse the repository at this point in the history
  • Loading branch information
sumitagrawl committed Sep 20, 2024
1 parent 88dd436 commit 7d8b0c9
Show file tree
Hide file tree
Showing 36 changed files with 1,872 additions and 20 deletions.
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4536,4 +4536,12 @@
maximum number of buckets across all volumes.
</description>
</property>
<property>
<name>ozone.om.leader.executor.enable</name>
<value>true</value>
<tag>OZONE, OM</tag>
<description>
flag to enable / disable experimental feature for leader size execution for performance.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ public static Codec<TransactionInfo> getCodec() {

public static TransactionInfo valueOf(String transactionInfo) {
final String[] tInfo = transactionInfo.split(TRANSACTION_INFO_SPLIT_KEY);
Preconditions.checkArgument(tInfo.length == 2,
Preconditions.checkArgument(tInfo.length >= 2 && tInfo.length <= 3,
"Unexpected split length: %s in \"%s\"", tInfo.length, transactionInfo);

try {
return valueOf(Long.parseLong(tInfo[0]), Long.parseLong(tInfo[1]));
Long index = null;
if (tInfo.length == 3) {
index = Long.parseLong(tInfo[2]);
}
return valueOf(Long.parseLong(tInfo[0]), Long.parseLong(tInfo[1]), index);
} catch (Exception e) {
throw new IllegalArgumentException("Failed to parse " + transactionInfo, e);
}
Expand All @@ -67,6 +71,14 @@ public static TransactionInfo valueOf(long currentTerm, long transactionIndex) {
return valueOf(TermIndex.valueOf(currentTerm, transactionIndex));
}

public static TransactionInfo valueOf(long currentTerm, long transactionIndex, Long index) {
return valueOf(TermIndex.valueOf(currentTerm, transactionIndex), index);
}

public static TransactionInfo valueOf(TermIndex termIndex, Long index) {
return new TransactionInfo(termIndex, index);
}

public static TransactionInfo valueOf(TermIndex termIndex) {
return new TransactionInfo(termIndex);
}
Expand Down Expand Up @@ -98,9 +110,19 @@ public static TermIndex getTermIndex(long transactionIndex) {
private final SnapshotInfo snapshotInfo;
/** The string need to be persisted in OM DB. */
private final String transactionInfoString;
private final Long index;

private TransactionInfo(TermIndex termIndex) {
this.transactionInfoString = termIndex.getTerm() + TRANSACTION_INFO_SPLIT_KEY + termIndex.getIndex();
this(termIndex, null);
}
private TransactionInfo(TermIndex termIndex, Long index) {
this.index = index;
if (null == index) {
this.transactionInfoString = termIndex.getTerm() + TRANSACTION_INFO_SPLIT_KEY + termIndex.getIndex();
} else {
this.transactionInfoString = termIndex.getTerm() + TRANSACTION_INFO_SPLIT_KEY + termIndex.getIndex()
+ TRANSACTION_INFO_SPLIT_KEY + index;
}
this.snapshotInfo = new SnapshotInfo() {
@Override
public TermIndex getTermIndex() {
Expand Down Expand Up @@ -136,6 +158,10 @@ public TermIndex getTermIndex() {
return snapshotInfo.getTermIndex();
}

public Long getIndex() {
return index;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -145,12 +171,12 @@ public boolean equals(Object o) {
return false;
}
TransactionInfo that = (TransactionInfo) o;
return this.getTermIndex().equals(that.getTermIndex());
return this.getTermIndex().equals(that.getTermIndex()) && Objects.equals(that.getIndex(), getIndex());
}

@Override
public int hashCode() {
return Objects.hash(getTerm(), getTransactionIndex());
return Objects.hash(getTerm(), getTransactionIndex(), getIndex());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,24 @@ String batchSizeDiscardedString() {
countSize2String(discardedCount, discardedSize));
}

public void retrieveCache(Map<ByteBuffer, ByteBuffer> dataMap) {
for (Map.Entry<Bytes, Object> d : ops.entrySet()) {
Bytes key = d.getKey();
Object value = d.getValue();
if (value instanceof byte[]) {
dataMap.put(ByteBuffer.wrap(key.array()), ByteBuffer.wrap((byte[]) value));
} else if (value instanceof CodecBuffer) {
dataMap.put(key.asReadOnlyByteBuffer(), ((CodecBuffer) value).asReadOnlyByteBuffer());
} else if (value == Op.DELETE) {
dataMap.put(ByteBuffer.wrap(key.array()), null);
} else {
throw new IllegalStateException("Unexpected value: " + value
+ ", class=" + value.getClass().getSimpleName());
}
}
isCommit = true;
}

@Override
public String toString() {
return name + ": " + family.getName();
Expand Down Expand Up @@ -320,6 +338,15 @@ String getCommitString() {
countSize2String(discardedCount, discardedSize),
countSize2String(opCount - discardedCount, opSize - discardedSize));
}

public Map<String, Map<ByteBuffer, ByteBuffer>> getCachedTransaction() {
Map<String, Map<ByteBuffer, ByteBuffer>> tableMap = new HashMap<>();
for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
Map<ByteBuffer, ByteBuffer> dataMap = tableMap.computeIfAbsent(e.getKey(), (p) -> new HashMap<>());
e.getValue().retrieveCache(dataMap);
}
return tableMap;
}
}

private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
Expand Down Expand Up @@ -378,4 +405,8 @@ public void put(ColumnFamily family, byte[] key, byte[] value)
throws IOException {
opCache.put(family, key, value);
}

public Map<String, Map<ByteBuffer, ByteBuffer>> getCachedTransaction() {
return opCache.getCachedTransaction();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ void deleteBatchWithPrefix(BatchOperation batch, KEY prefix)
*/
void loadFromFile(File externalFile) throws IOException;

default Table getRawTable() {
return this;
}

/**
* Class used to represent the key and value pair of a db entry.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,15 @@ public void cleanupCache(List<Long> epochs) {
}

@VisibleForTesting
TableCache<KEY, VALUE> getCache() {
public TableCache<KEY, VALUE> getCache() {
return cache;
}

@Override
public Table getRawTable() {
return rawTable;
}

/**
* Key value implementation for strongly typed tables.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public static boolean isReadOnly(
case AbortExpiredMultiPartUploads:
case SetSnapshotProperty:
case QuotaRepair:
case PersistDb:
case UnknownCommand:
return false;
case EchoRPC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,4 +623,7 @@ private OMConfigKeys() {
public static final String OZONE_OM_MAX_BUCKET =
"ozone.om.max.buckets";
public static final int OZONE_OM_MAX_BUCKET_DEFAULT = 100000;

public static final String OZONE_OM_LEADER_EXECUTOR_ENABLE = "ozone.om.leader.executor.enable";
public static final boolean OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
import org.apache.ozone.test.tag.Unhealthy;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -1882,6 +1883,7 @@ private void createLinkBucket(OzoneVolume sourceVolume, String sourceBucket,
}

@Test
@Unhealthy("HDDS-11415 handle with lockDetails changes")
public void testProcessingDetails() throws IOException, InterruptedException {
final Logger log = LoggerFactory.getLogger(
"org.apache.hadoop.ipc.ProcessingDetails");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Unhealthy;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -306,6 +307,7 @@ public void testDeleteWithMultiLevels() throws Exception {
}

@Test
@Unhealthy("HDDS-11415 To be removed as not applicable with new flow")
public void testDeleteWithMultiLevelsBlockDoubleBuffer() throws Exception {
Path root = new Path("/rootDirdd");
Path appRoot = new Path(root, "appRoot");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Unhealthy;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -117,6 +118,7 @@ public void shutdown() {
}

@Test
@Unhealthy("HDDS-11415 testcase check for volume/bucket not found on follower node, fix with no-cache")
void testAllSCMAreRunning() throws Exception {
int count = 0;
List<StorageContainerManager> scms = cluster.getStorageContainerManagers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import static org.slf4j.event.Level.DEBUG;

import org.apache.ozone.test.tag.Unhealthy;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -4841,6 +4842,7 @@ public void testUploadWithStreamAndMemoryMappedBuffer() throws IOException {
}

@Test
@Unhealthy("HDDS-11415 To be removed as not applicable with new flow")
public void testParallelDeleteBucketAndCreateKey() throws IOException,
InterruptedException, TimeoutException {
assumeThat(getCluster().getOzoneManager().isRatisEnabled()).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ozone.test.tag.Unhealthy;
import org.apache.ratis.grpc.server.GrpcLogAppender;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
Expand All @@ -72,6 +73,7 @@
* Test for OM bootstrap process.
*/
@Timeout(500)
@Unhealthy("HDDS-11415 OzoneManager Statemachine to be removed")
public class TestAddRemoveOzoneManager {

private MiniOzoneHAClusterImpl cluster = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.utils.FaultInjectorImpl;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Unhealthy;
import org.apache.ratis.server.protocol.TermIndex;
import org.assertj.core.api.Fail;
import org.jline.utils.Log;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -386,6 +388,7 @@ private void checkSnapshot(OzoneManager leaderOM, OzoneManager followerOM,

@Test
@Timeout(300)
@Unhealthy("HDDS-11415 local passes but remote fails, follower on start unable to call get snapshot")
public void testInstallIncrementalSnapshot(@TempDir Path tempDir)
throws Exception {
// Get the leader OM
Expand Down Expand Up @@ -596,6 +599,7 @@ private IncrementData getNextIncrementalTarball(

@Test
@Timeout(300)
@Unhealthy("HDDS-11415 local passes but remote fails, follower on start unable to call get snapshot")
public void testInstallIncrementalSnapshotWithFailure() throws Exception {
// Get the leader OM
String leaderOMNodeId = OmFailoverProxyUtil
Expand All @@ -622,7 +626,8 @@ public void testInstallIncrementalSnapshotWithFailure() throws Exception {

// Start the inactive OM. Checkpoint installation will happen spontaneously.
cluster.startInactiveOM(followerNodeId);

Log.info("Leader Node {}-{}, Follower Node {}", leaderOMNodeId, cluster.isOMActive(leaderOMNodeId),
followerNodeId, cluster.isOMActive(followerNodeId));
// Wait the follower download the snapshot,but get stuck by injector
GenericTestUtils.waitFor(() -> {
return followerOM.getOmSnapshotProvider().getNumDownloaded() == 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.ozone.om.ratis.OzoneManagerStateMachine;
import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages;

import org.apache.ozone.test.tag.Unhealthy;
import org.apache.ratis.util.LifeCycle;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -53,6 +54,7 @@
* Tests for OM upgrade finalization.
* TODO: can be merged into class with other OM tests with per-method cluster
*/
@Unhealthy("HDDS-11415 To fix upgrade prepare")
class TestOMUpgradeFinalization {
static {
AuditLogTestUtils.enableAuditLog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* Test OM prepare against actual mini cluster.
*/
@Flaky("HDDS-5990")
@Unhealthy("HDDS-11415 To fix upgrade prepare and verify")
public class TestOzoneManagerPrepare extends TestOzoneManagerHA {
private static final String BUCKET = "bucket";
private static final String VOLUME = "volume";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Unhealthy;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -107,6 +108,7 @@ public static void cleanUp() {

// Test snapshot diff when OM restarts in HA OM env.
@Test
@Unhealthy("HDDS-11415 follower cache update")
public void testSnapshotDiffWhenOmLeaderRestart()
throws Exception {
String snapshot1 = "snap-" + RandomStringUtils.randomNumeric(10);
Expand Down Expand Up @@ -163,6 +165,7 @@ public void testSnapshotDiffWhenOmLeaderRestart()
}

@Test
@Unhealthy("HDDS-11415 follower cache update")
public void testSnapshotIdConsistency() throws Exception {
createFileKey(ozoneBucket, "key-" + RandomStringUtils.randomNumeric(10));

Expand Down Expand Up @@ -200,6 +203,7 @@ public void testSnapshotIdConsistency() throws Exception {
* passed or empty.
*/
@Test
@Unhealthy("HDDS-11415 follower cache update")
public void testSnapshotNameConsistency() throws Exception {
store.createSnapshot(volumeName, bucketName, "");
List<OzoneManager> ozoneManagers = cluster.getOzoneManagersList();
Expand Down Expand Up @@ -282,6 +286,7 @@ private void createFileKey(OzoneBucket bucket, String keyName)
* and purgeSnapshot in same batch.
*/
@Test
@Unhealthy("HDDS-11415 om statemachine change and follower cache update")
public void testKeyAndSnapshotDeletionService() throws IOException, InterruptedException, TimeoutException {
OzoneManager omLeader = cluster.getOMLeader();
OzoneManager omFollower;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ozone.test.tag.Unhealthy;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
Expand Down Expand Up @@ -166,6 +167,7 @@ public void testSnapshotSplitAndMove() throws Exception {

@Test
@Order(1)
@Unhealthy("HDDS-11415 follower cache issue, to be fixed")
public void testMultipleSnapshotKeyReclaim() throws Exception {

Table<String, RepeatedOmKeyInfo> deletedTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public void testUpdateTransactionInfoTable() throws Exception {
CommandLine cmd = new CommandLine(new RDBRepair()).addSubcommand(new TransactionInfoRepair());
String dbPath = new File(OMStorage.getOmDbDir(conf) + "/" + OM_DB_NAME).getPath();

// create a volume to ensure transactionInfo is updated if its new environment
cluster.newClient().getObjectStore().createVolume("vol");
cluster.getOzoneManager().stop();

String cmdOut = scanTransactionInfoTable(dbPath);
Expand Down
Loading

0 comments on commit 7d8b0c9

Please sign in to comment.