From 9af1d8df7a88e1e4b247b0be034a2ad699ee5b37 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Mon, 7 Aug 2023 02:39:51 -0700 Subject: [PATCH] HDDS-9094. [Snapshot] Added data integrity integration test for SST filtering service (#5128) --- .../ozone/om/TestSstFilteringService.java | 385 +++++++++++------- 1 file changed, 247 insertions(+), 138 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java index 19f6d031c84..64c3e36599c 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java @@ -18,80 +18,81 @@ */ package org.apache.hadoop.ozone.om; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; -import org.apache.hadoop.hdds.server.ServerUtils; -import org.apache.hadoop.hdds.utils.db.DBConfigFromFile; import org.apache.hadoop.hdds.utils.db.DBProfile; import org.apache.hadoop.hdds.utils.db.RDBStore; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.lock.BootstrapStateHandler; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted; +import org.apache.hadoop.ozone.om.snapshot.SnapshotCache; import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.util.ExitUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.rocksdb.LiveFileMetaData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE; +import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; +import static org.awaitility.Awaitility.with; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test SST Filtering Service. */ public class TestSstFilteringService { - @Rule - public TemporaryFolder folder = new TemporaryFolder(); + @TempDir + private File folder; private OzoneManagerProtocol writeClient; private OzoneManager om; - private static final Logger LOG = - LoggerFactory.getLogger(TestSstFilteringService.class); + private OzoneConfiguration conf; + private KeyManager keyManager; - @BeforeClass + @BeforeAll public static void setup() { ExitUtils.disableSystemExit(); } - private OzoneConfiguration createConfAndInitValues() throws IOException { - OzoneConfiguration conf = new OzoneConfiguration(); - File newFolder = folder.newFolder(); - if (!newFolder.exists()) { - Assert.assertTrue(newFolder.mkdirs()); - } - System.setProperty(DBConfigFromFile.CONFIG_DIR, "/"); - ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString()); + @BeforeEach + public void init() throws AuthenticationException, IOException { + conf = new OzoneConfiguration(); + conf.set(OZONE_METADATA_DIRS, folder.getAbsolutePath()); conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, TimeUnit.MILLISECONDS); conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 100, @@ -99,11 +100,20 @@ private OzoneConfiguration createConfAndInitValues() throws IOException { conf.setEnum(HDDS_DB_PROFILE, DBProfile.TEST); conf.setQuietMode(false); - return conf; + OmTestManagers omTestManagers = new OmTestManagers(conf); + keyManager = omTestManagers.getKeyManager(); + writeClient = omTestManagers.getWriteClient(); + om = omTestManagers.getOzoneManager(); } - @After + @AfterEach public void cleanup() throws Exception { + if (keyManager != null) { + keyManager.stop(); + } + if (writeClient != null) { + writeClient.close(); + } if (om != null) { om.stop(); } @@ -113,99 +123,91 @@ public void cleanup() throws Exception { * Test checks whether for existing snapshots * the checkpoint should not have any sst files that do not correspond to * the bucket on which create snapshot command was issued. - * + *

* The SSTFiltering service deletes only the last level of * sst file (rocksdb behaviour). - * + *

* 1. Create Keys for vol1/buck1 (L0 ssts will be created for vol1/buck1) * 2. compact the db (new level SSTS will be created for vol1/buck1) * 3. Create keys for vol1/buck2 (L0 ssts will be created for vol1/buck2) * 4. Take snapshot on vol1/buck2. * 5. The snapshot will contain compacted sst files pertaining to vol1/buck1 - * Wait till the BG service deletes these. + * Wait till the BG service deletes these. * * @throws IOException - on Failure. */ - @Test public void testIrrelevantSstFileDeletion() - throws IOException, TimeoutException, InterruptedException, - AuthenticationException { - OzoneConfiguration conf = createConfAndInitValues(); - OmTestManagers omTestManagers = new OmTestManagers(conf); - KeyManager keyManager = omTestManagers.getKeyManager(); - writeClient = omTestManagers.getWriteClient(); - om = omTestManagers.getOzoneManager(); - RDBStore store = (RDBStore) om.getMetadataManager().getStore(); - - final int keyCount = 100; - createKeys(keyManager, "vol1", "buck1", keyCount / 2, 1); - SstFilteringService sstFilteringService = + throws IOException, InterruptedException { + RDBStore activeDbStore = (RDBStore) om.getMetadataManager().getStore(); + SstFilteringService filteringService = keyManager.getSnapshotSstFilteringService(); - String rocksDbDir = om.getRocksDbDirectory(); - store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE); + final int keyCount = 100; + String volumeName = "vol1"; + String bucketName1 = "buck1"; + createVolumeAndBucket(volumeName, bucketName1); + + createKeys(volumeName, bucketName1, keyCount / 2); + activeDbStore.getDb().flush(OmMetadataManagerImpl.KEY_TABLE); - createKeys(keyManager, "vol1", "buck1", keyCount / 2, 1); - store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE); + createKeys(volumeName, bucketName1, keyCount / 2); + activeDbStore.getDb().flush(OmMetadataManagerImpl.KEY_TABLE); int level0FilesCount = 0; int totalFileCount = 0; - List initialsstFileList = store.getDb().getSstFileList(); + List initialsstFileList = + activeDbStore.getDb().getSstFileList(); for (LiveFileMetaData fileMetaData : initialsstFileList) { totalFileCount++; if (fileMetaData.level() == 0) { level0FilesCount++; } } - LOG.debug("Total files : {}", totalFileCount); - LOG.debug("Total L0 files: {}", level0FilesCount); - Assert.assertEquals(totalFileCount, level0FilesCount); + assertEquals(totalFileCount, level0FilesCount); - store.getDb().compactRange(OmMetadataManagerImpl.KEY_TABLE); + activeDbStore.getDb().compactRange(OmMetadataManagerImpl.KEY_TABLE); - int level0FilesCountAfterCompact = 0; - int totalFileCountAfterCompact = 0; - int nonlevel0FilesCountAfterCompact = 0; - List nonlevelOFiles = new ArrayList<>(); + int nonLevel0FilesCountAfterCompact = 0; - for (LiveFileMetaData fileMetaData : store.getDb().getSstFileList()) { - totalFileCountAfterCompact++; - if (fileMetaData.level() == 0) { - level0FilesCountAfterCompact++; - } else { - nonlevel0FilesCountAfterCompact++; - nonlevelOFiles.add(fileMetaData); + List nonLevelOFiles = new ArrayList<>(); + for (LiveFileMetaData fileMetaData : activeDbStore.getDb() + .getSstFileList()) { + if (fileMetaData.level() != 0) { + nonLevel0FilesCountAfterCompact++; + nonLevelOFiles.add(fileMetaData); } } - LOG.debug("Total files : {}", totalFileCountAfterCompact); - LOG.debug("Total L0 files: {}", level0FilesCountAfterCompact); - LOG.debug("Total non L0/compacted files: {}", - nonlevel0FilesCountAfterCompact); - - Assert.assertTrue(nonlevel0FilesCountAfterCompact > 0); + assertTrue(nonLevel0FilesCountAfterCompact > 0); - createKeys(keyManager, "vol1", "buck2", keyCount, 1); + String bucketName2 = "buck2"; + createVolumeAndBucket(volumeName, bucketName2); + createKeys(volumeName, bucketName2, keyCount); - store.getDb().flush(OmMetadataManagerImpl.KEY_TABLE); + activeDbStore.getDb().flush(OmMetadataManagerImpl.KEY_TABLE); + List allFiles = activeDbStore.getDb().getSstFileList(); + String snapshotName1 = "snapshot1"; + writeClient.createSnapshot(volumeName, bucketName2, snapshotName1); - List allFiles = store.getDb().getSstFileList(); + with().atMost(Duration.ofSeconds(120)) + .pollInterval(Duration.ofSeconds(1)) + .await() + .until(() -> filteringService.getSnapshotFilteredCount().get() >= 1); - writeClient.createSnapshot("vol1", "buck2", "snapshot1"); + assertEquals(1, filteringService.getSnapshotFilteredCount().get()); - GenericTestUtils.waitFor( - () -> sstFilteringService.getSnapshotFilteredCount().get() >= 1, 1000, - 10000); - - Assert - .assertEquals(1, sstFilteringService.getSnapshotFilteredCount().get()); + Set keysFromActiveDb = getKeysFromDb(om.getMetadataManager(), + volumeName, bucketName2); + Set keysFromSnapshot = + getKeysFromSnapshot(volumeName, bucketName2, snapshotName1); + assertEquals(keysFromActiveDb, keysFromSnapshot); SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable() - .get(SnapshotInfo.getTableKey("vol1", "buck2", "snapshot1")); + .get(SnapshotInfo.getTableKey(volumeName, bucketName2, snapshotName1)); String dbSnapshots = rocksDbDir + OM_KEY_PREFIX + OM_SNAPSHOT_DIR; String snapshotDirName = @@ -214,61 +216,61 @@ public void testIrrelevantSstFileDeletion() for (LiveFileMetaData file : allFiles) { File sstFile = new File(snapshotDirName + OM_KEY_PREFIX + file.fileName()); - if (nonlevelOFiles.stream() + if (nonLevelOFiles.stream() .anyMatch(o -> file.fileName().equals(o.fileName()))) { - Assert.assertFalse(sstFile.exists()); + assertFalse(sstFile.exists()); } else { - Assert.assertTrue(sstFile.exists()); + assertTrue(sstFile.exists()); } } List processedSnapshotIds = Files .readAllLines(Paths.get(dbSnapshots, OzoneConsts.FILTERED_SNAPSHOTS)); - Assert.assertTrue( + assertTrue( processedSnapshotIds.contains(snapshotInfo.getSnapshotId().toString())); + String snapshotName2 = "snapshot2"; long count; - // Prevent the new snapshot from being filtered try (BootstrapStateHandler.Lock lock = - sstFilteringService.getBootstrapStateLock().lock()) { - count = sstFilteringService.getSnapshotFilteredCount().get(); - writeClient.createSnapshot("vol1", "buck2", "snapshot2"); - - // Confirm that it is not filtered - assertThrows(TimeoutException.class, () -> GenericTestUtils.waitFor( - () -> sstFilteringService.getSnapshotFilteredCount().get() > count, - 1000, 10000)); - assertEquals(count, sstFilteringService.getSnapshotFilteredCount().get()); + filteringService.getBootstrapStateLock().lock()) { + count = filteringService.getSnapshotFilteredCount().get(); + writeClient.createSnapshot(volumeName, bucketName2, snapshotName2); + + assertThrows(ConditionTimeoutException.class, () -> with() + .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .await() + .until(() -> + filteringService.getSnapshotFilteredCount().get() > count)); + + assertEquals(count, filteringService.getSnapshotFilteredCount().get()); } - // Now allow filtering - GenericTestUtils.waitFor( - () -> sstFilteringService.getSnapshotFilteredCount().get() > count, - 1000, 10000); + + with().atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .await() + .until(() -> filteringService.getSnapshotFilteredCount().get() > count); + + Set keysFromActiveDb2 = getKeysFromDb(om.getMetadataManager(), + volumeName, bucketName2); + Set keysFromSnapshot2 = + getKeysFromSnapshot(volumeName, bucketName2, snapshotName2); + assertEquals(keysFromActiveDb2, keysFromSnapshot2); } - @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") - private void createKeys(KeyManager keyManager, String volumeName, - String bucketName, int keyCount, int numBlocks) throws IOException { + private void createKeys(String volumeName, + String bucketName, + int keyCount) + throws IOException { for (int x = 0; x < keyCount; x++) { - String keyName = - String.format("key%s", RandomStringUtils.randomAlphanumeric(5)); - // Create Volume and Bucket - createVolumeAndBucket(keyManager, volumeName, bucketName, false); - - // Create the key - createAndCommitKey(writeClient, keyManager, volumeName, bucketName, - keyName, numBlocks); + String keyName = "key-" + RandomStringUtils.randomAlphanumeric(5); + createKey(writeClient, volumeName, bucketName, keyName); } } - private static void createVolumeAndBucket(KeyManager keyManager, - String volumeName, - String bucketName, - boolean isVersioningEnabled) + private void createVolumeAndBucket(String volumeName, + String bucketName) throws IOException { - // cheat here, just create a volume and bucket entry so that we can - // create the keys, we put the same data for key and value since the - // system does not decode the object OMRequestTestUtils.addVolumeToOM(keyManager.getMetadataManager(), OmVolumeArgs.newBuilder() .setOwnerName("o") @@ -279,16 +281,14 @@ private static void createVolumeAndBucket(KeyManager keyManager, OMRequestTestUtils.addBucketToOM(keyManager.getMetadataManager(), OmBucketInfo.newBuilder().setVolumeName(volumeName) .setBucketName(bucketName) - .setIsVersionEnabled(isVersioningEnabled) + .setIsVersionEnabled(false) .build()); } - private static OmKeyArgs createAndCommitKey(OzoneManagerProtocol writeClient, - KeyManager keyManager, - String volumeName, - String bucketName, - String keyName, - int numBlocks) + private void createKey(OzoneManagerProtocol managerProtocol, + String volumeName, + String bucketName, + String keyName) throws IOException { OmKeyArgs keyArg = @@ -302,36 +302,145 @@ private static OmKeyArgs createAndCommitKey(OzoneManagerProtocol writeClient, .setLocationInfoList(new ArrayList<>()) .build(); //Open and Commit the Key in the Key Manager. - OpenKeySession session = writeClient.openKey(keyArg); - for (int i = 0; i < numBlocks; i++) { - keyArg.addLocationInfo(writeClient.allocateBlock(keyArg, session.getId(), - new ExcludeList())); - } - writeClient.commitKey(keyArg, session.getId()); - return keyArg; + OpenKeySession session = managerProtocol.openKey(keyArg); + keyArg.addLocationInfo(managerProtocol.allocateBlock(keyArg, + session.getId(), new ExcludeList())); + managerProtocol.commitKey(keyArg, session.getId()); } @Test public void testFilterFunction() { - Assert.assertTrue(SstFilteringService.FILTER_FUNCTION.apply( + assertTrue(SstFilteringService.FILTER_FUNCTION.apply( "/vol1/bucket1/key1", "/vol1/bucket1/key1", "/vol1/bucket1/")); - Assert.assertTrue(SstFilteringService.FILTER_FUNCTION.apply( + assertTrue(SstFilteringService.FILTER_FUNCTION.apply( "/vol1/bucket1/key1", "/vol1/bucket5/key1", "/vol1/bucket3/")); - Assert.assertFalse(SstFilteringService.FILTER_FUNCTION.apply( + assertFalse(SstFilteringService.FILTER_FUNCTION.apply( "/vol1/bucket1/key1", "/vol1/bucket4/key9", "/vol1/bucket5/")); - Assert.assertFalse(SstFilteringService.FILTER_FUNCTION.apply( + assertFalse(SstFilteringService.FILTER_FUNCTION.apply( "/vol1/bucket1/key1", "/vol1/bucket1/key1", "/vol1/bucket2/")); - Assert.assertFalse(SstFilteringService.FILTER_FUNCTION.apply( + assertFalse(SstFilteringService.FILTER_FUNCTION.apply( "/vol1/bucket1/key1", "/vol1/bucket1/key1", "/vol1/bucket/")); } + + /** + * Test to verify the data integrity after SST filtering service runs. + * This test creates 150 keys randomly in one of the three buckets. It also + * forces flush and compaction after every 50 keys written. + * Once key creation finishes, we create one snapshot per bucket. After that, + * it waits for SSTFilteringService to run for all three snapshots. Once run + * finishes, it validates that keys in active DB buckets are same as in + * snapshot bucket. + */ + @Test + public void testSstFilteringService() throws IOException { + RDBStore activeDbStore = (RDBStore) om.getMetadataManager().getStore(); + String volumeName = "volume"; + List bucketNames = Arrays.asList("bucket", "bucket1", "bucket2"); + + for (String bucketName : bucketNames) { + createVolumeAndBucket(volumeName, bucketName); + } + + int keyCount = 150; + Set keyInBucket = new HashSet<>(); + Set keyInBucket1 = new HashSet<>(); + Set keyInBucket2 = new HashSet<>(); + + Random random = new Random(); + for (int i = 0; i < keyCount; i++) { + String keyName = "key-" + i; + String bucketName; + switch (random.nextInt(1000) % 3) { + case 0: + bucketName = bucketNames.get(0); + keyInBucket.add(keyName); + break; + case 1: + bucketName = bucketNames.get(1); + keyInBucket1.add(keyName); + break; + default: + bucketName = bucketNames.get(2); + keyInBucket2.add(keyName); + } + createKey(writeClient, volumeName, bucketName, keyName); + if (i % 50 == 0) { + activeDbStore.getDb().flush(OmMetadataManagerImpl.KEY_TABLE); + activeDbStore.getDb().compactRange(OmMetadataManagerImpl.KEY_TABLE); + } + } + + List snapshotNames = Arrays.asList("snap", "snap-1", "snap-2"); + + for (int i = 0; i < 3; i++) { + writeClient.createSnapshot(volumeName, bucketNames.get(i), + snapshotNames.get(i)); + } + + SstFilteringService sstFilteringService = + keyManager.getSnapshotSstFilteringService(); + + with().atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) + .await() + .until(() -> sstFilteringService.getSnapshotFilteredCount().get() >= 3); + assertEquals(3, sstFilteringService.getSnapshotFilteredCount().get()); + + Set keyInBucketAfterFilteringRun = + getKeysFromSnapshot(volumeName, bucketNames.get(0), + snapshotNames.get(0)); + Set keyInBucket1AfterFilteringRun = + getKeysFromSnapshot(volumeName, bucketNames.get(1), + snapshotNames.get(1)); + Set keyInBucket2AfterFilteringRun = + getKeysFromSnapshot(volumeName, bucketNames.get(2), + snapshotNames.get(2)); + assertEquals(keyInBucket, keyInBucketAfterFilteringRun); + assertEquals(keyInBucket1, keyInBucket1AfterFilteringRun); + assertEquals(keyInBucket2, keyInBucket2AfterFilteringRun); + } + + private Set getKeysFromDb(OMMetadataManager omMetadataReader, + String volume, + String bucket) throws IOException { + Set allKeys = new HashSet<>(); + + String startKey = null; + while (true) { + List omKeyInfoList = omMetadataReader.listKeys(volume, bucket, + startKey, null, 1000); + if (omKeyInfoList.isEmpty()) { + break; + } + for (OmKeyInfo omKeyInfo : omKeyInfoList) { + allKeys.add(omKeyInfo.getKeyName()); + } + startKey = omKeyInfoList.get(omKeyInfoList.size() - 1).getKeyName(); + } + return allKeys; + } + + private Set getKeysFromSnapshot(String volume, + String bucket, + String snapshot) throws IOException { + SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable() + .get(SnapshotInfo.getTableKey(volume, bucket, snapshot)); + try (ReferenceCounted + snapshotMetadataReader = om.getOmSnapshotManager() + .getSnapshotCache() + .get(snapshotInfo.getTableKey())) { + OmSnapshot omSnapshot = (OmSnapshot) snapshotMetadataReader.get(); + return getKeysFromDb(omSnapshot.getMetadataManager(), volume, bucket); + } + } }