From fb9c89a45db5d6cae53a66aed551234fea78779b Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Mon, 15 Jan 2024 13:46:43 -0800 Subject: [PATCH] [SPARK-46094] Support Executor JVM Profiling ### What changes were proposed in this pull request? This adds support for the async profiler to Spark ### Why are the changes needed? Profiling of JVM applications on a cluster is cumbersome and it can be complicated to save the output of the profiler especially if the cluster is on K8s where the executor pods are removed and any files saved to the local file system become inaccessible. This feature makes it simple to turn profiling on/off, includes the jar/binaries needed for profiling, and makes it simple to save output to an HDFS location. ### Does this PR introduce _any_ user-facing change? This PR introduces three new configuration parameters. These are described in the documentation. ### How was this patch tested? Tested manually on EKS ### Was this patch authored or co-authored using generative AI tooling? No Closes #44021 from parthchandra/async-profiler-apache-PR. Lead-authored-by: Parth Chandra Co-authored-by: Parth Chandra Signed-off-by: Dongjoon Hyun --- assembly/pom.xml | 10 + connector/profiler/README.md | 119 +++++++++++ connector/profiler/pom.xml | 50 +++++ .../profiler/ExecutorJVMProfiler.scala | 185 ++++++++++++++++++ .../profiler/ExecutorProfilerPlugin.scala | 69 +++++++ .../spark/executor/profiler/package.scala | 71 +++++++ pom.xml | 7 + 7 files changed, 511 insertions(+) create mode 100644 connector/profiler/README.md create mode 100644 connector/profiler/pom.xml create mode 100644 connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala create mode 100644 connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala create mode 100644 connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala diff --git a/assembly/pom.xml b/assembly/pom.xml index 48fe8f46341e6..77ff87c17f522 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -204,6 +204,16 @@ + + jvm-profiler + + + org.apache.spark + spark-profiler_${scala.binary.version} + ${project.version} + + + bigtop-dist + + + 4.0.0 + + org.apache.spark + spark-parent_2.13 + 4.0.0-SNAPSHOT + ../../pom.xml + + + spark-profiler_2.13 + + profiler + + jar + Spark Profiler + https://spark.apache.org/ + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + provided + + + + me.bechberger + ap-loader-all + 2.9-7 + + + diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala new file mode 100644 index 0000000000000..a5d5b2a1e98d5 --- /dev/null +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorJVMProfiler.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor.profiler + +import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException} +import java.net.URI +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import one.profiler.{AsyncProfiler, AsyncProfilerLoader} +import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils + + +/** + * A class that enables the async JVM code profiler + */ +private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) extends Logging { + + private var running = false + private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED) + private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS) + private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR) + private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR) + private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL) + + private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr" + private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr" + private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr" + private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr" + + private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB + private var outputStream: FSDataOutputStream = _ + private var inputStream: InputStream = _ + private val dataBuffer = new Array[Byte](UPLOAD_SIZE) + private var threadpool: ScheduledExecutorService = _ + private var writing: Boolean = false + + val profiler: Option[AsyncProfiler] = { + Option( + if (enableProfiler && AsyncProfilerLoader.isSupported) AsyncProfilerLoader.load() else null + ) + } + + def start(): Unit = { + if (!running) { + try { + profiler.foreach(p => { + p.execute(startcmd) + logInfo("Executor JVM profiling started.") + running = true + startWriting() + }) + } catch { + case e @ (_: IllegalArgumentException | _: IllegalStateException | _: IOException) => + logError("JVM profiling aborted. Exception occurred in profiler native code: ", e) + case e: Exception => logWarning("Executor JVM profiling aborted due to exception: ", e) + } + } + } + + /** Stops the profiling and saves output to dfs location. */ + def stop(): Unit = { + if (running) { + profiler.foreach(p => { + p.execute(stopcmd) + logInfo("JVM profiler stopped") + running = false + finishWriting() + }) + } + } + + private def startWriting(): Unit = { + if (profilerDfsDir.isDefined) { + val applicationId = try { + conf.getAppId + } catch { + case _: NoSuchElementException => "local-" + System.currentTimeMillis + } + val config = SparkHadoopUtil.get.newConfiguration(conf) + val appName = conf.get("spark.app.name").replace(" ", "-") + val profilerOutputDirname = profilerDfsDir.get + + val profileOutputFile = + s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr" + val fs = FileSystem.get(new URI(profileOutputFile), config); + val filenamePath = new Path(profileOutputFile) + outputStream = fs.create(filenamePath) + try { + if (fs.exists(filenamePath)) { + fs.delete(filenamePath, true) + } + logInfo(s"Copying executor profiling file to $profileOutputFile") + inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr")) + threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread") + threadpool.scheduleWithFixedDelay( + new Runnable() { + override def run(): Unit = writeChunk(false) + }, + writeInterval, + writeInterval, + TimeUnit.SECONDS) + writing = true + } catch { + case e: Exception => + logError("Failed to start JVM profiler", e) + if (threadpool != null) { + threadpool.shutdownNow() + } + if (inputStream != null) { + inputStream.close() + } + if (outputStream != null) { + outputStream.close() + } + } + } + } + + private def writeChunk(lastChunk: Boolean): Unit = { + if (!writing) { + return + } + try { + // stop (pause) the profiler, dump the results and then resume. This is not ideal as we miss + // the events while the file is being dumped, but that is the only way to make sure that + // the chunk of data we are copying to dfs is in a consistent state. + profiler.get.execute(stopcmd) + profiler.get.execute(dumpcmd) + var remaining = inputStream.available() + if (!lastChunk) { + profiler.get.execute(resumecmd) + } + while (remaining > 0) { + val read = inputStream.read(dataBuffer, 0, math.min(remaining, UPLOAD_SIZE)) + outputStream.write(dataBuffer, 0, read) + remaining -= read + } + } catch { + case e: IOException => logError("Exception occurred while writing some profiler output: ", e) + case e @ (_: IllegalArgumentException | _: IllegalStateException) => + logError("Some profiler output not written." + + " Exception occurred in profiler native code: ", e) + case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e) + } + } + + private def finishWriting(): Unit = { + if (profilerDfsDir.isDefined && writing) { + try { + // shutdown background writer + threadpool.shutdown() + threadpool.awaitTermination(30, TimeUnit.SECONDS) + // flush remaining data + writeChunk(true) + inputStream.close() + outputStream.close() + } catch { + case _: InterruptedException => Thread.currentThread().interrupt() + case e: IOException => + logWarning("Some profiling output not written." + + "Exception occurred while completing profiler output", e) + } + writing = false + } + } +} diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala new file mode 100644 index 0000000000000..e144092cdecd2 --- /dev/null +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/ExecutorProfilerPlugin.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor.profiler + +import java.util.{Map => JMap} + +import scala.jdk.CollectionConverters._ +import scala.util.Random + +import org.apache.spark.SparkConf +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.internal.Logging + + +/** + * Spark plugin to do JVM code profiling of executors + */ +class ExecutorProfilerPlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = null + + // No-op + override def executorPlugin(): ExecutorPlugin = new JVMProfilerExecutorPlugin +} + +class JVMProfilerExecutorPlugin extends ExecutorPlugin with Logging { + + private var sparkConf: SparkConf = _ + private var pluginCtx: PluginContext = _ + private var profiler: ExecutorJVMProfiler = _ + private var codeProfilingEnabled: Boolean = _ + private var codeProfilingFraction: Double = _ + private val rand: Random = new Random(System.currentTimeMillis()) + + override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = { + pluginCtx = ctx + sparkConf = ctx.conf() + codeProfilingEnabled = sparkConf.get(EXECUTOR_PROFILING_ENABLED) + if (codeProfilingEnabled) { + codeProfilingFraction = sparkConf.get(EXECUTOR_PROFILING_FRACTION) + if (rand.nextInt(100) * 0.01 < codeProfilingFraction) { + logInfo(s"Executor id ${pluginCtx.executorID()} selected for JVM code profiling") + profiler = new ExecutorJVMProfiler(sparkConf, pluginCtx.executorID()) + profiler.start() + } + } + Map.empty[String, String].asJava + } + + override def shutdown(): Unit = { + logInfo("Executor JVM profiler shutting down") + if (profiler != null) { + profiler.stop() + } + } +} diff --git a/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala new file mode 100644 index 0000000000000..f9adec2d4be90 --- /dev/null +++ b/connector/profiler/src/main/scala/org/apache/spark/executor/profiler/package.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.config.ConfigBuilder + +package object profiler { + + private[profiler] val EXECUTOR_PROFILING_ENABLED = + ConfigBuilder("spark.executor.profiling.enabled") + .doc("Turn on code profiling via async_profiler in executors.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + + private[profiler] val EXECUTOR_PROFILING_DFS_DIR = + ConfigBuilder("spark.executor.profiling.dfsDir") + .doc("HDFS compatible file-system path to where the profiler will write output jfr files.") + .version("4.0.0") + .stringConf + .createOptional + + private[profiler] val EXECUTOR_PROFILING_LOCAL_DIR = + ConfigBuilder("spark.executor.profiling.localDir") + .doc("Local file system path on executor where profiler output is saved. Defaults to the " + + "working directory of the executor process.") + .version("4.0.0") + .stringConf + .createWithDefault(".") + + private[profiler] val EXECUTOR_PROFILING_OPTIONS = + ConfigBuilder("spark.executor.profiling.options") + .doc("Options to pass on to the async profiler.") + .version("4.0.0") + .stringConf + .createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s") + + private[profiler] val EXECUTOR_PROFILING_FRACTION = + ConfigBuilder("spark.executor.profiling.fraction") + .doc("Fraction of executors to profile") + .version("4.0.0") + .doubleConf + .checkValue(v => v >= 0.0 && v <= 1.0, + "Fraction of executors to profile must be in [0,1]") + .createWithDefault(0.1) + + private[profiler] val EXECUTOR_PROFILING_WRITE_INTERVAL = + ConfigBuilder("spark.executor.profiling.writeInterval") + .doc("Time interval in seconds after which the profiler output will be synced to dfs") + .version("4.0.0") + .timeConf(TimeUnit.SECONDS) + .checkValue(_ >= 0, "Write interval should be non-negative") + .createWithDefault(30) + +} diff --git a/pom.xml b/pom.xml index 120a7fc09a7bd..3eb8b0917bf5b 100644 --- a/pom.xml +++ b/pom.xml @@ -3670,6 +3670,13 @@ + + jvm-profiler + + connector/profiler + + + test-java-home