Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Auto Compaction was incorrectly including large files towards minNumFiles #4045 #4178

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -1108,9 +1108,11 @@ trait OptimisticTransactionImpl extends DeltaTransaction
def createAutoCompactStatsCollector(): AutoCompactPartitionStatsCollector = {
try {
if (spark.conf.get(DeltaSQLConf.DELTA_AUTO_COMPACT_RECORD_PARTITION_STATS_ENABLED)) {
val maxFileSize = spark.conf
.get(DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE)
val minFileSize = spark.conf
.get(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_FILE_SIZE)
.getOrElse(Long.MaxValue)
.getOrElse(maxFileSize / 2L)
return AutoCompactPartitionStats.instance(spark)
.createStatsCollector(minFileSize, reportAutoCompactStatsError)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.sql.delta.commands.optimize._
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.AutoCompactPartitionStats

import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -146,15 +145,6 @@ trait AutoCompactBase extends PostCommitHook with DeltaLogging {
opType,
maxDeletedRowsRatio
)
val partitionsStats = AutoCompactPartitionStats.instance(spark)
// Mark partitions as compacted before releasing them.
// Otherwise an already compacted partition might get picked up by a concurrent thread.
// But only marks it as compacted, if no exception was thrown by auto compaction so that the
// partitions stay eligible for subsequent auto compactions.
partitionsStats.markPartitionsAsCompacted(
tableId,
autoCompactRequest.allowedPartitions
)
metrics
} catch {
case e: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,23 @@ class AutoCompactPartitionStats(

/**
* This class to store the states of one table partition. These state includes:
* -- the number of small files,
* -- the thread that assigned to compact this partition, and
* -- whether the partition was compacted.
* -- the number of small files and
* -- the thread that assigned to compact this partition.
*
* Note: Since this class keeps tracking of the statistics of the table partition and the state of
* the auto compaction thread that works on the table partition, any method that accesses any
* attribute of this class needs to be protected by synchronized context.
*/
class PartitionStat(
var numFiles: Long,
var wasAutoCompacted: Boolean = false) {
var numFiles: Long) {

/**
* Determine whether this partition can be autocompacted based on the number of small files or
* if this [[AutoCompactPartitionStats]] instance has not auto compacted it yet.
* @param minNumFiles The minimum number of files this table-partition should have to trigger
* Auto Compaction in case it has already been compacted once.
* Determine whether this partition should be autocompacted based on the number of small files.
* @param minNumFiles The minimum number of files this table-partition must have to trigger
* Auto Compaction.
*/
def hasSufficientSmallFilesOrHasNotBeenCompacted(minNumFiles: Long): Boolean =
!wasAutoCompacted || hasSufficientFiles(minNumFiles)

def hasSufficientFiles(minNumFiles: Long): Boolean = numFiles >= minNumFiles
def hasSufficientSmallFiles(minNumFiles: Long): Boolean = numFiles >= minNumFiles
}

/**
Expand Down Expand Up @@ -305,21 +300,12 @@ class AutoCompactPartitionStats(
tablePartitionStatsCache.get(tableId).map { tablePartitionStates =>
targetPartitions.filter { partitionKey =>
tablePartitionStates.get(partitionKey.##).exists { partitionState =>
partitionState.hasSufficientSmallFilesOrHasNotBeenCompacted(minNumFiles)
partitionState.hasSufficientSmallFiles(minNumFiles)
}
}
}.getOrElse(Set.empty)
}

def markPartitionsAsCompacted(tableId: String, compactedPartitions: Set[PartitionKey])
: Unit = synchronized {
tablePartitionStatsCache.get(tableId).foreach { tablePartitionStats =>
compactedPartitions
.foreach(partitionKey => tablePartitionStats.get(partitionKey.##)
.foreach(_.wasAutoCompacted = true))
}
}

/**
* Collect the number of files, which are less than minFileSize, added to or removed from each
* partition from `actions`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,13 @@ class AutoCompactSuite extends
DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED.key -> s"true",
DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "30") {
val path = dir.getCanonicalPath
// Append 1 file to each partition: record runOnModifiedPartitions event, as is first write
// Append 1 file to each partition: record skipInsufficientFilesInModifiedPartitions event,
// as not enough small files exist
var usageLogs = captureOptimizeLogs(AutoCompact.OP_TYPE) {
createFilesToPartitions(numFilePartitions = 3, numFilesPerPartition = 1, path)
}
var log = JsonUtils.mapper.readValue[Map[String, String]](usageLogs.head.blob)
assert(log("status") == "runOnModifiedPartitions" && log("partitions") == "3")
assert(log("status") == "skipInsufficientFilesInModifiedPartitions")
// Append 10 more file to each partition: record skipInsufficientFilesInModifiedPartitions
// event.
usageLogs = captureOptimizeLogs(AutoCompact.OP_TYPE) {
Expand Down Expand Up @@ -196,7 +197,7 @@ class AutoCompactSuite extends
df.write.format("delta").mode("append").save(dir)
val deltaLog = DeltaLog.forTable(spark, dir)
val newSnapshot = deltaLog.update()
assert(newSnapshot.version === 1) // 0 is the first commit, 1 is optimize
assert(newSnapshot.version === 1)
assert(deltaLog.update().numOfFiles === 1)

val isLogged = checkAutoOptimizeLogging {
Expand Down Expand Up @@ -283,34 +284,54 @@ class AutoCompactSuite extends
}

testBothModesViaProperty("auto compact should not kick in when there aren't " +
"enough files") { dir =>
withSQLConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "5") {
"enough small files") { dir =>
withSQLConf(
DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "6",
DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE.key -> "20000"
) {
AutoCompactPartitionStats.instance(spark).resetTestOnly()
spark.range(10).repartition(4).write.format("delta").mode("append").save(dir)

// First write - 4 small files
spark.range(10).repartition(4).write.format("delta").mode("append").save(dir)
val deltaLog = DeltaLog.forTable(spark, dir)
val newSnapshot = deltaLog.update()
assert(newSnapshot.version === 0)
assert(deltaLog.update().numOfFiles === 4)
assert(deltaLog.update().numOfFiles === 4, "Should have 4 initial small files")

// Second write - 4 large files
spark.range(10000).repartition(4).write.format("delta").mode("append").save(dir)

val writeEvent = deltaLog.history.getHistory(Some(1)).head
assert(writeEvent.operation === "WRITE",
"Large files shouldn't trigger auto compaction")
assert(deltaLog.update().numOfFiles === 8,
"Should have 4 small + 4 large files")

// Third write - 2 more small files to reach minNumFiles
val isLogged2 = checkAutoOptimizeLogging {
spark.range(10).repartition(4).write.format("delta").mode("append").save(dir)
spark.range(10).repartition(2).write.format("delta").mode("append").save(dir)
}

assert(isLogged2)
val lastEvent = deltaLog.history.getHistory(Some(1)).head
assert(lastEvent.operation === "OPTIMIZE")
assert(lastEvent.operationParameters("auto") === "true")
val compactionEvent = deltaLog.history.getHistory(Some(3)).head
assert(compactionEvent.operation === "OPTIMIZE",
"Should trigger compaction with 6 small files")
assert(compactionEvent.operationParameters("auto") === "true")

assert(deltaLog.update().numOfFiles === 1, "Files should be optimized into a single one")
val finalSnapshot = deltaLog.update()
assert(finalSnapshot.numOfFiles === 5,
"Should have 4 large files + 1 compacted small file")

checkAnswer(
spark.read.format("delta").load(dir),
spark.range(10).union(spark.range(10)).toDF()
spark.range(10)
.union(spark.range(10000))
.union(spark.range(10))
.toDF()
)
}
}


testBothModesViaProperty("ensure no NPE in auto compact UDF with null " +
"partition values") { dir =>
Seq(null, "", " ").zipWithIndex.foreach { case (partValue, i) =>
Expand Down
Loading