Skip to content

Commit

Permalink
[SPARK-46216][CORE] Improve FileSystemPersistenceEngine to support …
Browse files Browse the repository at this point in the history
…compressions

### What changes were proposed in this pull request?

This PR aims to improve `FileSystemPersistenceEngine` to support compressions via a new configuration, `spark.deploy.recoveryCompressionCodec`.

### Why are the changes needed?

To allow the users to choose a proper compression codec for their workloads. For `JavaSerializer` case, `LZ4` compression is **2x** faster than the baseline (no compression).
```
OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
2000 Workers:                                             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
----------------------------------------------------------------------------------------------------------------------------------------
ZooKeeperPersistenceEngine with JavaSerializer                     2276           2360         115          0.0     1137909.6       1.0X
ZooKeeperPersistenceEngine with KryoSerializer                     1883           1906          34          0.0      941364.2       1.2X
FileSystemPersistenceEngine with JavaSerializer                     431            436           7          0.0      215436.9       5.3X
FileSystemPersistenceEngine with JavaSerializer (lz4)               209            216           9          0.0      104404.1      10.9X
FileSystemPersistenceEngine with JavaSerializer (lzf)               199            202           2          0.0       99489.5      11.4X
FileSystemPersistenceEngine with JavaSerializer (snappy)            192            199           9          0.0       95872.9      11.9X
FileSystemPersistenceEngine with JavaSerializer (zstd)              258            264           6          0.0      129249.4       8.8X
FileSystemPersistenceEngine with KryoSerializer                     139            151          13          0.0       69374.5      16.4X
FileSystemPersistenceEngine with KryoSerializer (lz4)               159            165           8          0.0       79588.9      14.3X
FileSystemPersistenceEngine with KryoSerializer (lzf)               180            195          18          0.0       89844.0      12.7X
FileSystemPersistenceEngine with KryoSerializer (snappy)            164            183          18          0.0       82016.0      13.9X
FileSystemPersistenceEngine with KryoSerializer (zstd)              206            218          11          0.0      102838.9      11.1X
BlackHolePersistenceEngine                                            0              0           0         35.1          28.5   39908.5X
```

### Does this PR introduce _any_ user-facing change?

No, this is a new feature.

### How was this patch tested?

Pass the CIs with the newly added test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44129 from dongjoon-hyun/SPARK-46216.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Dec 3, 2023
1 parent a3d7019 commit 3da2e5c
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 21 deletions.
22 changes: 15 additions & 7 deletions core/benchmarks/PersistenceEngineBenchmark-jdk21-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@ PersistenceEngineBenchmark

OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
ZooKeeperPersistenceEngine with JavaSerializer 1100 1255 150 0.0 1099532.9 1.0X
ZooKeeperPersistenceEngine with KryoSerializer 946 967 20 0.0 946367.3 1.2X
FileSystemPersistenceEngine with JavaSerializer 218 223 4 0.0 217851.5 5.0X
FileSystemPersistenceEngine with KryoSerializer 79 87 12 0.0 78611.1 14.0X
BlackHolePersistenceEngine 0 0 0 42.0 23.8 46191.1X
2000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------------------
ZooKeeperPersistenceEngine with JavaSerializer 2254 2329 119 0.0 1126867.1 1.0X
ZooKeeperPersistenceEngine with KryoSerializer 1911 1912 1 0.0 955667.1 1.2X
FileSystemPersistenceEngine with JavaSerializer 438 448 15 0.0 218868.1 5.1X
FileSystemPersistenceEngine with JavaSerializer (lz4) 187 195 8 0.0 93337.8 12.1X
FileSystemPersistenceEngine with JavaSerializer (lzf) 193 216 20 0.0 96678.8 11.7X
FileSystemPersistenceEngine with JavaSerializer (snappy) 175 183 10 0.0 87652.3 12.9X
FileSystemPersistenceEngine with JavaSerializer (zstd) 243 255 14 0.0 121695.2 9.3X
FileSystemPersistenceEngine with KryoSerializer 150 160 15 0.0 75089.7 15.0X
FileSystemPersistenceEngine with KryoSerializer (lz4) 170 177 10 0.0 84996.7 13.3X
FileSystemPersistenceEngine with KryoSerializer (lzf) 192 203 12 0.0 96019.1 11.7X
FileSystemPersistenceEngine with KryoSerializer (snappy) 184 202 16 0.0 92241.3 12.2X
FileSystemPersistenceEngine with KryoSerializer (zstd) 232 238 5 0.0 116075.2 9.7X
BlackHolePersistenceEngine 0 0 0 27.3 36.6 30761.0X


22 changes: 15 additions & 7 deletions core/benchmarks/PersistenceEngineBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@ PersistenceEngineBenchmark

OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1051-azure
AMD EPYC 7763 64-Core Processor
1000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
ZooKeeperPersistenceEngine with JavaSerializer 1202 1298 138 0.0 1201614.2 1.0X
ZooKeeperPersistenceEngine with KryoSerializer 951 1004 48 0.0 950559.0 1.3X
FileSystemPersistenceEngine with JavaSerializer 212 217 6 0.0 211623.2 5.7X
FileSystemPersistenceEngine with KryoSerializer 79 81 2 0.0 79132.5 15.2X
BlackHolePersistenceEngine 0 0 0 30.9 32.4 37109.8X
2000 Workers: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
----------------------------------------------------------------------------------------------------------------------------------------
ZooKeeperPersistenceEngine with JavaSerializer 2276 2360 115 0.0 1137909.6 1.0X
ZooKeeperPersistenceEngine with KryoSerializer 1883 1906 34 0.0 941364.2 1.2X
FileSystemPersistenceEngine with JavaSerializer 431 436 7 0.0 215436.9 5.3X
FileSystemPersistenceEngine with JavaSerializer (lz4) 209 216 9 0.0 104404.1 10.9X
FileSystemPersistenceEngine with JavaSerializer (lzf) 199 202 2 0.0 99489.5 11.4X
FileSystemPersistenceEngine with JavaSerializer (snappy) 192 199 9 0.0 95872.9 11.9X
FileSystemPersistenceEngine with JavaSerializer (zstd) 258 264 6 0.0 129249.4 8.8X
FileSystemPersistenceEngine with KryoSerializer 139 151 13 0.0 69374.5 16.4X
FileSystemPersistenceEngine with KryoSerializer (lz4) 159 165 8 0.0 79588.9 14.3X
FileSystemPersistenceEngine with KryoSerializer (lzf) 180 195 18 0.0 89844.0 12.7X
FileSystemPersistenceEngine with KryoSerializer (snappy) 164 183 18 0.0 82016.0 13.9X
FileSystemPersistenceEngine with KryoSerializer (zstd) 206 218 11 0.0 102838.9 11.1X
BlackHolePersistenceEngine 0 0 0 35.1 28.5 39908.5X


Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.file.{Files, Paths}
import scala.reflect.ClassTag

import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
Expand All @@ -37,7 +38,8 @@ import org.apache.spark.util.Utils
*/
private[master] class FileSystemPersistenceEngine(
val dir: String,
val serializer: Serializer)
val serializer: Serializer,
val codec: Option[CompressionCodec] = None)
extends PersistenceEngine with Logging {

Files.createDirectories(Paths.get(dir))
Expand All @@ -62,7 +64,8 @@ private[master] class FileSystemPersistenceEngine(
if (file.exists()) { throw new IllegalStateException("File already exists: " + file) }
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
val fileOut = new FileOutputStream(file)
var fileOut: OutputStream = new FileOutputStream(file)
codec.foreach { c => fileOut = c.compressedOutputStream(fileOut) }
var out: SerializationStream = null
Utils.tryWithSafeFinally {
out = serializer.newInstance().serializeStream(fileOut)
Expand All @@ -76,7 +79,8 @@ private[master] class FileSystemPersistenceEngine(
}

private def deserializeFromFile[T](file: File)(implicit m: ClassTag[T]): T = {
val fileIn = new FileInputStream(file)
var fileIn: InputStream = new FileInputStream(file)
codec.foreach { c => fileIn = c.compressedInputStream(new FileInputStream(file)) }
var in: DeserializationStream = null
try {
in = serializer.newInstance().deserializeStream(fileIn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.deploy.master
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Deploy.RECOVERY_DIRECTORY
import org.apache.spark.internal.config.Deploy.{RECOVERY_COMPRESSION_CODEC, RECOVERY_DIRECTORY}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer

/**
Expand Down Expand Up @@ -57,7 +58,8 @@ private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, serializer:

def createPersistenceEngine(): PersistenceEngine = {
logInfo("Persisting recovery state to directory: " + recoveryDir)
new FileSystemPersistenceEngine(recoveryDir, serializer)
val codec = conf.get(RECOVERY_COMPRESSION_CODEC).map(c => CompressionCodec.createCodec(conf, c))
new FileSystemPersistenceEngine(recoveryDir, serializer, codec)
}

def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ private[spark] object Deploy {
.checkValues(RecoverySerializer.values.map(_.toString))
.createWithDefault(RecoverySerializer.JAVA.toString)

val RECOVERY_COMPRESSION_CODEC = ConfigBuilder("spark.deploy.recoveryCompressionCodec")
.doc("A compression codec for persistence engines. none (default), lz4, lzf, snappy, and " +
"zstd. Currently, only FILESYSTEM mode supports this configuration.")
.version("4.0.0")
.stringConf
.createOptional

val RECOVERY_MODE_FACTORY = ConfigBuilder("spark.deploy.recoveryMode.factory")
.version("1.2.0")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Deploy._
import org.apache.spark.internal.config.UI._
import org.apache.spark.internal.config.Worker._
import org.apache.spark.io.LZ4CompressionCodec
import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceRequirement}
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
Expand Down Expand Up @@ -346,6 +347,45 @@ class MasterSuite extends SparkFunSuite
}
}

test("SPARK-46216: Recovery without compression") {
val conf = new SparkConf(loadDefaults = false)
conf.set(RECOVERY_MODE, "FILESYSTEM")
conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))

var master: Master = null
try {
master = makeAliveMaster(conf)
val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
assert(e.codec.isEmpty)
} finally {
if (master != null) {
master.rpcEnv.shutdown()
master.rpcEnv.awaitTermination()
master = null
}
}
}

test("SPARK-46216: Recovery with compression") {
val conf = new SparkConf(loadDefaults = false)
conf.set(RECOVERY_MODE, "FILESYSTEM")
conf.set(RECOVERY_DIRECTORY, System.getProperty("java.io.tmpdir"))
conf.set(RECOVERY_COMPRESSION_CODEC, "lz4")

var master: Master = null
try {
master = makeAliveMaster(conf)
val e = master.invokePrivate(_persistenceEngine()).asInstanceOf[FileSystemPersistenceEngine]
assert(e.codec.get.isInstanceOf[LZ4CompressionCodec])
} finally {
if (master != null) {
master.rpcEnv.shutdown()
master.rpcEnv.awaitTermination()
master = null
}
}
}

test("master/worker web ui available") {
implicit val formats = org.json4s.DefaultFormats
val conf = new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
import org.apache.spark.io.CompressionCodec
import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -52,7 +53,7 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {

val numIters = 3
val numWorkers = 1000
val numWorkers = 2000
val workers = (1 to numWorkers).map(createWorkerInfo).toArray

conf.set(ZOOKEEPER_URL, zkTestServer.getConnectString)
Expand All @@ -73,14 +74,28 @@ object PersistenceEngineBenchmark extends BenchmarkBase {

serializers.foreach { serializer =>
val serializerName = serializer.getClass.getSimpleName
benchmark.addCase(s"FileSystemPersistenceEngine with $serializerName", numIters) { _ =>
val name = s"FileSystemPersistenceEngine with $serializerName"
benchmark.addCase(name, numIters) { _ =>
val dir = Utils.createTempDir().getAbsolutePath
val engine = new FileSystemPersistenceEngine(dir, serializer)
workers.foreach(engine.addWorker)
engine.read[WorkerInfo]("worker_")
workers.foreach(engine.removeWorker)
engine.close()
}
CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c =>
val codec = CompressionCodec.createCodec(conf, c)
val shortCodecName = CompressionCodec.getShortName(c)
val name = s"FileSystemPersistenceEngine with $serializerName ($shortCodecName)"
benchmark.addCase(name, numIters) { _ =>
val dir = Utils.createTempDir().getAbsolutePath
val engine = new FileSystemPersistenceEngine(dir, serializer, Some(codec))
workers.foreach(engine.addWorker)
engine.read[WorkerInfo]("worker_")
workers.foreach(engine.removeWorker)
engine.close()
}
}
}

benchmark.addCase("BlackHolePersistenceEngine", numIters) { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.curator.test.TestingServer

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Deploy.ZOOKEEPER_URL
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, Serializer}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -74,6 +75,18 @@ class PersistenceEngineSuite extends SparkFunSuite {
}
}

test("SPARK-46216: FileSystemPersistenceEngine with compression") {
val conf = new SparkConf()
CompressionCodec.ALL_COMPRESSION_CODECS.foreach { c =>
val codec = CompressionCodec.createCodec(conf, c)
withTempDir { dir =>
testPersistenceEngine(conf, serializer =>
new FileSystemPersistenceEngine(dir.getAbsolutePath, serializer, Some(codec))
)
}
}
}

test("ZooKeeperPersistenceEngine") {
val conf = new SparkConf()
// TestingServer logs the port conflict exception rather than throwing an exception.
Expand Down

0 comments on commit 3da2e5c

Please sign in to comment.