Skip to content

Commit

Permalink
[SPARK-46796][SS] Ensure the correct remote files (mentioned in metad…
Browse files Browse the repository at this point in the history
…ata.zip) are used on RocksDB version load

### What changes were proposed in this pull request?

This PR ensures that RocksDB loads do not run into SST file Version ID mismatch issue. RocksDB has added validation to ensure exact same SST file is used during database load from snapshot. Current streaming state suffers from certain edge cases where this condition is violated resulting in state load failure.

The changes introduced are:

1. Ensure that the local SST file is exactly the same DFS file (as per mapping in metadata.zip). We keep track of the DFS file path for a local SST file, and re download the SST file in case DFS file has a different UUID in metadata zip.
2. Reset lastSnapshotVersion in RocksDB when Rocks DB is loaded. Changelog checkpoint relies on this version for future snapshots. Currently, if a older version is reloaded we were not uploading snapshots as lastSnapshotVersion was pointing to a higher snapshot of a cleanup database.

### Why are the changes needed?

We need to ensure that the correct SST files are used on executor during RocksDB load as per mapping in metadata.zip. With current implementation, its possible that the executor uses a SST file (with a different UUID) from a older version which is not the exact file mapped in the metadata.zip. This can cause version Id mismatch errors while loading RocksDB leading to streaming query failures.

See https://issues.apache.org/jira/browse/SPARK-46796 for failure scenarios.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added exhaustive unit testcases covering the scenarios.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44837 from sahnib/SPARK-46796.

Authored-by: Bhuwan Sahni <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
sahnib authored and HeartSaVioR committed Jan 24, 2024
1 parent 74b6301 commit f25ebe5
Show file tree
Hide file tree
Showing 3 changed files with 372 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ class RocksDB(
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
loadedVersion = latestSnapshotVersion

// reset last snapshot version
lastSnapshotVersion = 0L
openDB()

numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
Expand Down Expand Up @@ -202,6 +204,7 @@ class RocksDB(
*/
private def replayChangelog(endVersion: Long): Unit = {
for (v <- loadedVersion + 1 to endVersion) {
logInfo(s"replaying changelog from version $loadedVersion -> $endVersion")
var changelogReader: StateStoreChangelogReader = null
try {
changelogReader = fileManager.getChangelogReader(v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ class RocksDBFileManager(
import RocksDBImmutableFile._

private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]


// used to keep a mapping of the exact Dfs file that was used to create a local SST file.
// The reason this is a separate map because versionToRocksDBFiles can contain multiple similar
// SST files to a particular local file (for example 1.sst can map to 1-UUID1.sst in v1 and
// 1-UUID2.sst in v2). We need to capture the exact file used to ensure Version ID compatibility
// across SST files and RocksDB manifest.
private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String, RocksDBImmutableFile]

private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
private val onlyZipFiles = new PathFilter {
Expand Down Expand Up @@ -223,6 +232,7 @@ class RocksDBFileManager(
versionToRocksDBFiles.keySet().removeIf(_ >= version)
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localFilesToDfsFiles.clear()
localDir.mkdirs()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
Expand Down Expand Up @@ -459,44 +469,54 @@ class RocksDBFileManager(
// Get the immutable files used in previous versions, as some of those uploaded files can be
// reused for this version
logInfo(s"Saving RocksDB files to DFS for $version")
val prevFilesToSizes = versionToRocksDBFiles.asScala.filter { case (k, _) => k < version }
.values.flatten.map { f =>
f.localFileName -> f
}.toMap

var bytesCopied = 0L
var filesCopied = 0L
var filesReused = 0L

val immutableFiles = localFiles.map { localFile =>
prevFilesToSizes
.get(localFile.getName)
.filter(_.isSameFile(localFile))
.map { reusable =>
filesReused += 1
reusable
}.getOrElse {
val localFileName = localFile.getName
val dfsFileName = newDFSFileName(localFileName)
val dfsFile = dfsFilePath(dfsFileName)
// Note: The implementation of copyFromLocalFile() closes the output stream when there is
// any exception while copying. So this may generate partial files on DFS. But that is
// okay because until the main [version].zip file is written, those partial files are
// not going to be used at all. Eventually these files should get cleared.
fs.copyFromLocalFile(
new Path(localFile.getAbsoluteFile.toURI), dfsFile)
val localFileSize = localFile.length()
logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes")
filesCopied += 1
bytesCopied += localFileSize

RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
}
val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName)
if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes == localFile.length()) {
val dfsFile = existingDfsFile.get
filesReused += 1
logInfo(s"reusing file $dfsFile for $localFile")
RocksDBImmutableFile(localFile.getName, dfsFile.dfsFileName, dfsFile.sizeBytes)
} else {
val localFileName = localFile.getName
val dfsFileName = newDFSFileName(localFileName)
val dfsFile = dfsFilePath(dfsFileName)
// Note: The implementation of copyFromLocalFile() closes the output stream when there is
// any exception while copying. So this may generate partial files on DFS. But that is
// okay because until the main [version].zip file is written, those partial files are
// not going to be used at all. Eventually these files should get cleared.
fs.copyFromLocalFile(
new Path(localFile.getAbsoluteFile.toURI), dfsFile)
val localFileSize = localFile.length()
logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes")
filesCopied += 1
bytesCopied += localFileSize

val immutableDfsFile = RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
localFilesToDfsFiles.put(localFileName, immutableDfsFile)

immutableDfsFile
}
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)

// clean up deleted SST files from the localFilesToDfsFiles Map
val currentLocalFiles = localFiles.map(_.getName).toSet
val mappingsToClean = localFilesToDfsFiles.asScala
.keys
.filterNot(currentLocalFiles.contains)

mappingsToClean.foreach { f =>
logInfo(s"cleaning $f from the localFilesToDfsFiles map")
localFilesToDfsFiles.remove(f)
}

saveCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
filesCopied = filesCopied,
Expand All @@ -516,11 +536,22 @@ class RocksDBFileManager(
// Delete unnecessary local immutable files
listRocksDBFiles(localDir)._1
.foreach { existingFile =>
val isSameFile =
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
val requiredFile = requiredFileNameToFileDetails.get(existingFile.getName)
val prevDfsFile = localFilesToDfsFiles.asScala.get(existingFile.getName)
val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName &&
existingFile.length() == requiredFile.get.sizeBytes
} else {
false
}

if (!isSameFile) {
existingFile.delete()
logInfo(s"Deleted local file $existingFile")
localFilesToDfsFiles.remove(existingFile.getName)
logInfo(s"Deleted local file $existingFile with size ${existingFile.length()} mapped" +
s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
} else {
logInfo(s"reusing $prevDfsFile present at $existingFile for $requiredFile")
}
}

Expand All @@ -545,6 +576,7 @@ class RocksDBFileManager(
}
filesCopied += 1
bytesCopied += localFileSize
localFilesToDfsFiles.put(localFileName, file)
logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
} else {
filesReused += 1
Expand Down
Loading

0 comments on commit f25ebe5

Please sign in to comment.