Skip to content

Commit

Permalink
[HUDI-6516] Correct the use of hoodie.bootstrap.mode.selector (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Jul 23, 2023
1 parent 629349c commit c14ac0e
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* A bootstrap selector which employs bootstrap mode by specified partitions.
*/
public class BootstrapRegexModeSelector extends BootstrapModeSelector {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hudi.client.bootstrap.HoodieBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkValidatorUtils;
Expand Down Expand Up @@ -73,7 +72,6 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -115,10 +113,6 @@ private void validate() {
"Ensure Bootstrap Source Path is set");
checkArgument(config.getBootstrapModeSelectorClass() != null,
"Ensure Bootstrap Partition Selector is set");
if (METADATA_ONLY.name().equals(config.getBootstrapModeSelectorRegex())) {
checkArgument(!config.getBootstrapModeSelectorClass().equals(FullRecordBootstrapModeSelector.class.getCanonicalName()),
"FullRecordBootstrapModeSelector cannot be used with METADATA_ONLY bootstrap mode");
}
}

@Override
Expand Down Expand Up @@ -320,18 +314,8 @@ private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndPr
BootstrapModeSelector selector =
(BootstrapModeSelector) ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);

Map<BootstrapMode, List<String>> result = new HashMap<>();
// for FULL_RECORD mode, original record along with metadata fields are needed
if (FULL_RECORD.equals(config.getBootstrapModeForRegexMatch())) {
if (!(selector instanceof FullRecordBootstrapModeSelector)) {
FullRecordBootstrapModeSelector fullRecordBootstrapModeSelector = new FullRecordBootstrapModeSelector(config);
result.putAll(fullRecordBootstrapModeSelector.select(folders));
} else {
result.putAll(selector.select(folders));
}
} else {
result = selector.select(folders);
}
Map<BootstrapMode, List<String>> result = selector.select(folders);

Map<String, List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(
Collectors.toMap(Pair::getKey, Pair::getValue));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec
metaClient.reloadActiveTimeline();
assertEquals(0, metaClient.getCommitsTimeline().countInstants());
assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, context)
.stream().flatMap(f -> f.getValue().stream()).count());
.stream().mapToLong(f -> f.getValue().size()).sum());

BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
assertFalse(index.useIndex());
Expand All @@ -295,7 +295,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec

// Upsert case
long updateTimestamp = Instant.now().toEpochMilli();
String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
String updateSPath = tmpFolder.toAbsolutePath() + "/data2";
generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath);
JavaRDD<HoodieRecord> updateBatch =
generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), updateSPath, context),
Expand Down Expand Up @@ -390,7 +390,6 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
Dataset<Row> missingBootstrapped = sqlContext.sql("select a._hoodie_record_key from bootstrapped a "
+ "where a._hoodie_record_key not in (select _row_key from original)");
assertEquals(0, missingBootstrapped.count());
//sqlContext.sql("select * from bootstrapped").show(10, false);
}

// RO Input Format Read
Expand All @@ -410,7 +409,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
}
assertEquals(totalRecords, seenKeys.size());

//RT Input Format Read
// RT Input Format Read
reloadInputFormats();
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
Expand Down Expand Up @@ -475,7 +474,7 @@ private void checkBootstrapResults(int totalRecords, Schema schema, String insta
}
assertEquals(totalRecords, seenKeys.size());

//RT Input Format Read - Project only non-hoodie column
// RT Input Format Read - Project only non-hoodie column
reloadInputFormats();
seenKeys = new HashSet<>();
records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ private static Stream<Arguments> testArgs() {
for (Boolean dash : dashPartitions) {
for (String bt : bootstrapType) {
for (Integer n : nPartitions) {
//can't be mixed bootstrap if it's nonpartitioned
//don't need to test slash partitions if it's nonpartitioned
// can't be mixed bootstrap if it's nonpartitioned
// don't need to test slash partitions if it's nonpartitioned
if ((!bt.equals("mixed") && dash) || n > 0) {
b.add(Arguments.of(bt, dash, tt, n));
}
Expand All @@ -129,7 +129,7 @@ public void runTests(String bootstrapType, Boolean dashPartitions, HoodieTableTy
this.nPartitions = nPartitions;
setupDirs();

//do bootstrap
// do bootstrap
Map<String, String> options = setBootstrapOptions();
Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
bootstrapDf.write().format("hudi")
Expand All @@ -139,7 +139,7 @@ public void runTests(String bootstrapType, Boolean dashPartitions, HoodieTableTy
compareTables();
verifyMetaColOnlyRead(0);

//do upserts
// do upserts
options = basicOptions();
doUpdate(options, "001");
compareTables();
Expand Down Expand Up @@ -224,8 +224,8 @@ protected void doUpsert(Map<String,String> options, Dataset<Row> df) {
.mode(SaveMode.Append)
.save(hudiBasePath);
if (bootstrapType.equals("mixed")) {
//mixed tables have a commit for each of the metadata and full bootstrap modes
//so to align with the regular hudi table, we need to compact after 4 commits instead of 3
// mixed tables have a commit for each of the metadata and full bootstrap modes
// so to align with the regular hudi table, we need to compact after 4 commits instead of 3
nCompactCommits = "4";
}
df.write().format("hudi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package org.apache.hudi.functional

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
import org.apache.hudi.client.bootstrap.selector.{FullRecordBootstrapModeSelector, MetadataOnlyBootstrapModeSelector}
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
Expand All @@ -30,9 +29,11 @@ import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, sort
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.HoodieClientTestUtils
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger}

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
Expand Down Expand Up @@ -75,12 +76,14 @@ class TestDataSourceForBootstrap {
val verificationCol: String = "driver"
val originalVerificationVal: String = "driver_0"
val updatedVerificationVal: String = "driver_update"
val metadataOnlySelector: String = classOf[MetadataOnlyBootstrapModeSelector].getCanonicalName
val fullRecordSelector: String = classOf[FullRecordBootstrapModeSelector].getCanonicalName

/**
* TODO rebase onto existing test base-class to avoid duplication
*/
@BeforeEach
def initialize(@TempDir tempDir: java.nio.file.Path) {
def initialize(@TempDir tempDir: java.nio.file.Path): Unit = {
val sparkConf = HoodieClientTestUtils.getSparkConfForTest(getClass.getSimpleName)

spark = SparkSession.builder.config(sparkConf).getOrCreate
Expand Down Expand Up @@ -119,7 +122,7 @@ class TestDataSourceForBootstrap {
// Perform bootstrap
val bootstrapKeygenClass = classOf[NonpartitionedKeyGenerator].getName
val options = commonOpts.-(DataSourceWriteOptions.PARTITIONPATH_FIELD.key)
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
extraOpts = options ++ Map(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> bootstrapKeygenClass),
bootstrapKeygenClass = bootstrapKeygenClass
Expand Down Expand Up @@ -166,13 +169,13 @@ class TestDataSourceForBootstrap {

@ParameterizedTest
@CsvSource(value = Array(
"METADATA_ONLY,AVRO",
"org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector,AVRO",
// TODO(HUDI-5807) enable for spark native records
/* "METADATA_ONLY,SPARK", */
"FULL_RECORD,AVRO",
"FULL_RECORD,SPARK"
/* "org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK", */
"org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,AVRO",
"org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector,SPARK"
))
def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String, recordType: HoodieRecordType): Unit = {
def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapSelector: String, recordType: HoodieRecordType): Unit = {
val timestamp = Instant.now.toEpochMilli
val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)

Expand All @@ -189,22 +192,22 @@ class TestDataSourceForBootstrap {
val readOpts = commonOpts ++ Map(
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr",
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key -> bootstrapMode
HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key -> bootstrapSelector
)

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
readOpts ++ getRecordTypeOpts(recordType),
classOf[SimpleKeyGenerator].getName)

// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))

val expectedDF = bootstrapMode match {
case "METADATA_ONLY" =>
val expectedDF = bootstrapSelector match {
case `metadataOnlySelector` =>
sort(sourceDF)
case "FULL_RECORD" =>
case `fullRecordSelector` =>
sort(sourceDF)
}

Expand Down Expand Up @@ -271,7 +274,7 @@ class TestDataSourceForBootstrap {
)

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
Expand Down Expand Up @@ -346,7 +349,7 @@ class TestDataSourceForBootstrap {
)

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
Expand Down Expand Up @@ -414,7 +417,7 @@ class TestDataSourceForBootstrap {
)

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
Expand Down Expand Up @@ -481,7 +484,7 @@ class TestDataSourceForBootstrap {
)

// Perform bootstrap
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
val commitInstantTime1 = runBootstrapAndVerifyCommit(
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
writeOpts,
classOf[SimpleKeyGenerator].getName)
Expand Down Expand Up @@ -616,9 +619,9 @@ class TestDataSourceForBootstrap {
verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true)
}

def runMetadataBootstrapAndVerifyCommit(tableType: String,
extraOpts: Map[String, String] = Map.empty,
bootstrapKeygenClass: String): String = {
def runBootstrapAndVerifyCommit(tableType: String,
extraOpts: Map[String, String] = Map.empty,
bootstrapKeygenClass: String): String = {
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write
.format("hudi")
Expand All @@ -632,7 +635,8 @@ class TestDataSourceForBootstrap {

val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val expectedBootstrapInstant =
if ("FULL_RECORD".equals(extraOpts.getOrElse(HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.key, HoodieBootstrapConfig.PARTITION_SELECTOR_REGEX_MODE.defaultValue)))
if (fullRecordSelector.equals(extraOpts.getOrElse(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key,
HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.defaultValue)))
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS
else HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS
assertEquals(expectedBootstrapInstant, commitInstantTime1)
Expand Down Expand Up @@ -689,9 +693,9 @@ class TestDataSourceForBootstrap {

object TestDataSourceForBootstrap {

def sort(df: DataFrame) = df.sort("_row_key")
def sort(df: DataFrame): Dataset[Row] = df.sort("_row_key")

def dropMetaCols(df: DataFrame) =
def dropMetaCols(df: DataFrame): DataFrame =
df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)

}

0 comments on commit c14ac0e

Please sign in to comment.