diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index af57cbaac90..f089eb1d280 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1108,9 +1108,11 @@ trait OptimisticTransactionImpl extends DeltaTransaction def createAutoCompactStatsCollector(): AutoCompactPartitionStatsCollector = { try { if (spark.conf.get(DeltaSQLConf.DELTA_AUTO_COMPACT_RECORD_PARTITION_STATS_ENABLED)) { + val maxFileSize = spark.conf + .get(DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE) val minFileSize = spark.conf .get(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_FILE_SIZE) - .getOrElse(Long.MaxValue) + .getOrElse(maxFileSize / 2L) return AutoCompactPartitionStats.instance(spark) .createStatsCollector(minFileSize, reportAutoCompactStatsError) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala index 5c3b9ff95c4..d223a9bbcc6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.delta.commands.optimize._ import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.stats.AutoCompactPartitionStats import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession @@ -146,15 +145,6 @@ trait AutoCompactBase extends PostCommitHook with DeltaLogging { opType, maxDeletedRowsRatio ) - val partitionsStats = AutoCompactPartitionStats.instance(spark) - // Mark partitions as compacted before releasing them. - // Otherwise an already compacted partition might get picked up by a concurrent thread. - // But only marks it as compacted, if no exception was thrown by auto compaction so that the - // partitions stay eligible for subsequent auto compactions. - partitionsStats.markPartitionsAsCompacted( - tableId, - autoCompactRequest.allowedPartitions - ) metrics } catch { case e: Throwable => diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala index d9e24d89a45..8a4a5e9d154 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala @@ -56,28 +56,23 @@ class AutoCompactPartitionStats( /** * This class to store the states of one table partition. These state includes: - * -- the number of small files, - * -- the thread that assigned to compact this partition, and - * -- whether the partition was compacted. + * -- the number of small files and + * -- the thread that assigned to compact this partition. * * Note: Since this class keeps tracking of the statistics of the table partition and the state of * the auto compaction thread that works on the table partition, any method that accesses any * attribute of this class needs to be protected by synchronized context. */ class PartitionStat( - var numFiles: Long, - var wasAutoCompacted: Boolean = false) { + var numFiles: Long) { /** - * Determine whether this partition can be autocompacted based on the number of small files or - * if this [[AutoCompactPartitionStats]] instance has not auto compacted it yet. - * @param minNumFiles The minimum number of files this table-partition should have to trigger - * Auto Compaction in case it has already been compacted once. + * Determine whether this partition should be autocompacted based on the number of small files. + * @param minNumFiles The minimum number of files this table-partition must have to trigger + * Auto Compaction. */ - def hasSufficientSmallFilesOrHasNotBeenCompacted(minNumFiles: Long): Boolean = - !wasAutoCompacted || hasSufficientFiles(minNumFiles) - def hasSufficientFiles(minNumFiles: Long): Boolean = numFiles >= minNumFiles + def hasSufficientSmallFiles(minNumFiles: Long): Boolean = numFiles >= minNumFiles } /** @@ -305,21 +300,12 @@ class AutoCompactPartitionStats( tablePartitionStatsCache.get(tableId).map { tablePartitionStates => targetPartitions.filter { partitionKey => tablePartitionStates.get(partitionKey.##).exists { partitionState => - partitionState.hasSufficientSmallFilesOrHasNotBeenCompacted(minNumFiles) + partitionState.hasSufficientSmallFiles(minNumFiles) } } }.getOrElse(Set.empty) } - def markPartitionsAsCompacted(tableId: String, compactedPartitions: Set[PartitionKey]) - : Unit = synchronized { - tablePartitionStatsCache.get(tableId).foreach { tablePartitionStats => - compactedPartitions - .foreach(partitionKey => tablePartitionStats.get(partitionKey.##) - .foreach(_.wasAutoCompacted = true)) - } - } - /** * Collect the number of files, which are less than minFileSize, added to or removed from each * partition from `actions`. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala index dbdab268553..42efd298553 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala @@ -152,12 +152,13 @@ class AutoCompactSuite extends DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED.key -> s"true", DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "30") { val path = dir.getCanonicalPath - // Append 1 file to each partition: record runOnModifiedPartitions event, as is first write + // Append 1 file to each partition: record skipInsufficientFilesInModifiedPartitions event, + // as not enough small files exist var usageLogs = captureOptimizeLogs(AutoCompact.OP_TYPE) { createFilesToPartitions(numFilePartitions = 3, numFilesPerPartition = 1, path) } var log = JsonUtils.mapper.readValue[Map[String, String]](usageLogs.head.blob) - assert(log("status") == "runOnModifiedPartitions" && log("partitions") == "3") + assert(log("status") == "skipInsufficientFilesInModifiedPartitions") // Append 10 more file to each partition: record skipInsufficientFilesInModifiedPartitions // event. usageLogs = captureOptimizeLogs(AutoCompact.OP_TYPE) { @@ -196,7 +197,7 @@ class AutoCompactSuite extends df.write.format("delta").mode("append").save(dir) val deltaLog = DeltaLog.forTable(spark, dir) val newSnapshot = deltaLog.update() - assert(newSnapshot.version === 1) // 0 is the first commit, 1 is optimize + assert(newSnapshot.version === 1) assert(deltaLog.update().numOfFiles === 1) val isLogged = checkAutoOptimizeLogging { @@ -283,34 +284,54 @@ class AutoCompactSuite extends } testBothModesViaProperty("auto compact should not kick in when there aren't " + - "enough files") { dir => - withSQLConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "5") { + "enough small files") { dir => + withSQLConf( + DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "6", + DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE.key -> "20000" + ) { AutoCompactPartitionStats.instance(spark).resetTestOnly() - spark.range(10).repartition(4).write.format("delta").mode("append").save(dir) + // First write - 4 small files + spark.range(10).repartition(4).write.format("delta").mode("append").save(dir) val deltaLog = DeltaLog.forTable(spark, dir) val newSnapshot = deltaLog.update() assert(newSnapshot.version === 0) - assert(deltaLog.update().numOfFiles === 4) + assert(deltaLog.update().numOfFiles === 4, "Should have 4 initial small files") + // Second write - 4 large files + spark.range(10000).repartition(4).write.format("delta").mode("append").save(dir) + + val writeEvent = deltaLog.history.getHistory(Some(1)).head + assert(writeEvent.operation === "WRITE", + "Large files shouldn't trigger auto compaction") + assert(deltaLog.update().numOfFiles === 8, + "Should have 4 small + 4 large files") + + // Third write - 2 more small files to reach minNumFiles val isLogged2 = checkAutoOptimizeLogging { - spark.range(10).repartition(4).write.format("delta").mode("append").save(dir) + spark.range(10).repartition(2).write.format("delta").mode("append").save(dir) } - assert(isLogged2) - val lastEvent = deltaLog.history.getHistory(Some(1)).head - assert(lastEvent.operation === "OPTIMIZE") - assert(lastEvent.operationParameters("auto") === "true") + val compactionEvent = deltaLog.history.getHistory(Some(3)).head + assert(compactionEvent.operation === "OPTIMIZE", + "Should trigger compaction with 6 small files") + assert(compactionEvent.operationParameters("auto") === "true") - assert(deltaLog.update().numOfFiles === 1, "Files should be optimized into a single one") + val finalSnapshot = deltaLog.update() + assert(finalSnapshot.numOfFiles === 5, + "Should have 4 large files + 1 compacted small file") checkAnswer( spark.read.format("delta").load(dir), - spark.range(10).union(spark.range(10)).toDF() + spark.range(10) + .union(spark.range(10000)) + .union(spark.range(10)) + .toDF() ) } } + testBothModesViaProperty("ensure no NPE in auto compact UDF with null " + "partition values") { dir => Seq(null, "", " ").zipWithIndex.foreach { case (partValue, i) =>