Skip to content

Commit

Permalink
[SPARK-46094] Support Executor JVM Profiling
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This adds support for the async profiler to Spark

### Why are the changes needed?

Profiling of JVM applications on a cluster is cumbersome and it can be complicated to save the output of the profiler especially if the cluster is on K8s where the executor pods are removed and any files saved to the local file system become inaccessible. This feature makes it simple to turn profiling on/off, includes the jar/binaries needed for profiling,  and makes it simple to save output to an HDFS location.

### Does this PR introduce _any_ user-facing change?
This PR introduces three new configuration parameters. These are described in the documentation.

### How was this patch tested?
Tested manually on EKS

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44021 from parthchandra/async-profiler-apache-PR.

Lead-authored-by: Parth Chandra <[email protected]>
Co-authored-by: Parth Chandra <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
2 people authored and dongjoon-hyun committed Jan 15, 2024
1 parent b095960 commit fb9c89a
Show file tree
Hide file tree
Showing 7 changed files with 511 additions and 0 deletions.
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>jvm-profiler</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-profiler_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>bigtop-dist</id>
<!-- This profile uses the assembly plugin to create a special "dist" package for BigTop
Expand Down
119 changes: 119 additions & 0 deletions connector/profiler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Spark JVM Profiler Plugin

## Build

To build
```
./build/mvn clean package -DskipTests -Pjvm-profiler
```

## Executor Code Profiling

The spark profiler module enables code profiling of executors in cluster mode based on the the [async profiler](https://github.com/async-profiler/async-profiler/blob/v2.10/README.md), a low overhead sampling profiler. This allows a Spark application to capture CPU and memory profiles for application running on a cluster which can later be analyzed for performance issues. The profiler captures [Java Flight Recorder (jfr)](https://access.redhat.com/documentation/es-es/red_hat_build_of_openjdk/17/html/using_jdk_flight_recorder_with_red_hat_build_of_openjdk/openjdk-flight-recorded-overview) files for each executor; these can be read by many tools including Java Mission Control and Intellij.

The profiler writes the jfr files to the executor's working directory in the executor's local file system and the files can grow to be large so it is advisable that the executor machines have adequate storage. The profiler can be configured to copy the jfr files to a hdfs location before the executor shuts down.

Code profiling is currently only supported for

* Linux (x64)
* Linux (arm 64)
* Linux (musl, x64)
* MacOS

To get maximum profiling information set the following jvm options for the executor :

```
-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer
```

For more information on async_profiler see the [Async Profiler Manual](https://krzysztofslusarski.github.io/2022/12/12/async-manual.html)


To enable code profiling, first enable the code profiling plugin via

```
spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin
```

Then enable the profiling in the configuration.


### Code profiling configuration

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.executor.profiling.enabled</code></td>
<td><code>false</code></td>
<td>
If true, will enable code profiling
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.dfsDir</code></td>
<td>(none)</td>
<td>
An HDFS compatible path to which the profiler's output files are copied. The output files will be written as <i>dfsDir/application_id/profile-appname-exec-executor_id.jfr</i> <br/>
If no <i>dfsDir</i> is specified then the files are not copied over. Users should ensure there is sufficient disk space available otherwise it may lead to corrupt jfr files.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.localDir</code></td>
<td><code>.</code> i.e. the executor's working dir</td>
<td>
The local directory in the executor container to write the jfr files to. If not specified the file will be written to the executor's working directory. Users should ensure there is sufficient disk space available on the system as running out of space may result in corrupt jfr file and even cause jobs to fail on systems like K8s.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.options</code></td>
<td>event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s</td>
<td>
Options to pass to the profiler. Detailed options are documented in the comments here:
<a href="https://github.com/async-profiler/async-profiler/blob/32601bccd9e49adda9510a2ed79d142ac6ef0ff9/src/arguments.cpp#L52">Profiler arguments</a>.
Note that the options to start, stop, specify output format, and output file do not have to be specified.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.fraction</code></td>
<td>0.10</td>
<td>
The fraction of executors on which to enable code profiling. The executors to be profiled are picked at random.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.executor.profiling.writeInterval</code></td>
<td>30</td>
<td>
Time interval, in seconds, after which the profiler output will be synced to dfs.
</td>
<td>4.0.0</td>
</tr>
</table>

### Kubernetes
On Kubernetes, spark will try to shut down the executor pods while the profiler files are still being saved. To prevent this set
```
spark.kubernetes.executor.deleteOnTermination=false
```

### Example
```
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
-c spark.executor.extraJavaOptions="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer" \
-c spark.plugins=org.apache.spark.executor.profiler.ExecutorProfilerPlugin \
-c spark.executor.profiling.enabled=true \
-c spark.executor.profiling.dfsDir=s3a://my-bucket/spark/profiles/ \
-c spark.executor.profiling.options=event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s \
-c spark.executor.profiling.fraction=0.10 \
-c spark.kubernetes.executor.deleteOnTermination=false \
<application-jar> \
[application-arguments]
```
50 changes: 50 additions & 0 deletions connector/profiler/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.13</artifactId>
<version>4.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>spark-profiler_2.13</artifactId>
<properties>
<sbt.project.name>profiler</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Profiler</name>
<url>https://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- async-profiler loader contains async_profiler binaries for multiple platforms -->
<dependency>
<groupId>me.bechberger</groupId>
<artifactId>ap-loader-all</artifactId>
<version>2.9-7</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor.profiler

import java.io.{BufferedInputStream, FileInputStream, InputStream, IOException}
import java.net.URI
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import one.profiler.{AsyncProfiler, AsyncProfilerLoader}
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils


/**
* A class that enables the async JVM code profiler
*/
private[spark] class ExecutorJVMProfiler(conf: SparkConf, executorId: String) extends Logging {

private var running = false
private val enableProfiler = conf.get(EXECUTOR_PROFILING_ENABLED)
private val profilerOptions = conf.get(EXECUTOR_PROFILING_OPTIONS)
private val profilerDfsDir = conf.get(EXECUTOR_PROFILING_DFS_DIR)
private val profilerLocalDir = conf.get(EXECUTOR_PROFILING_LOCAL_DIR)
private val writeInterval = conf.get(EXECUTOR_PROFILING_WRITE_INTERVAL)

private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val dumpcmd = s"dump,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val resumecmd = s"resume,$profilerOptions,file=$profilerLocalDir/profile.jfr"

private val UPLOAD_SIZE = 8 * 1024 * 1024 // 8 MB
private var outputStream: FSDataOutputStream = _
private var inputStream: InputStream = _
private val dataBuffer = new Array[Byte](UPLOAD_SIZE)
private var threadpool: ScheduledExecutorService = _
private var writing: Boolean = false

val profiler: Option[AsyncProfiler] = {
Option(
if (enableProfiler && AsyncProfilerLoader.isSupported) AsyncProfilerLoader.load() else null
)
}

def start(): Unit = {
if (!running) {
try {
profiler.foreach(p => {
p.execute(startcmd)
logInfo("Executor JVM profiling started.")
running = true
startWriting()
})
} catch {
case e @ (_: IllegalArgumentException | _: IllegalStateException | _: IOException) =>
logError("JVM profiling aborted. Exception occurred in profiler native code: ", e)
case e: Exception => logWarning("Executor JVM profiling aborted due to exception: ", e)
}
}
}

/** Stops the profiling and saves output to dfs location. */
def stop(): Unit = {
if (running) {
profiler.foreach(p => {
p.execute(stopcmd)
logInfo("JVM profiler stopped")
running = false
finishWriting()
})
}
}

private def startWriting(): Unit = {
if (profilerDfsDir.isDefined) {
val applicationId = try {
conf.getAppId
} catch {
case _: NoSuchElementException => "local-" + System.currentTimeMillis
}
val config = SparkHadoopUtil.get.newConfiguration(conf)
val appName = conf.get("spark.app.name").replace(" ", "-")
val profilerOutputDirname = profilerDfsDir.get

val profileOutputFile =
s"$profilerOutputDirname/$applicationId/profile-$appName-exec-$executorId.jfr"
val fs = FileSystem.get(new URI(profileOutputFile), config);
val filenamePath = new Path(profileOutputFile)
outputStream = fs.create(filenamePath)
try {
if (fs.exists(filenamePath)) {
fs.delete(filenamePath, true)
}
logInfo(s"Copying executor profiling file to $profileOutputFile")
inputStream = new BufferedInputStream(new FileInputStream(s"$profilerLocalDir/profile.jfr"))
threadpool = ThreadUtils.newDaemonSingleThreadScheduledExecutor("profilerOutputThread")
threadpool.scheduleWithFixedDelay(
new Runnable() {
override def run(): Unit = writeChunk(false)
},
writeInterval,
writeInterval,
TimeUnit.SECONDS)
writing = true
} catch {
case e: Exception =>
logError("Failed to start JVM profiler", e)
if (threadpool != null) {
threadpool.shutdownNow()
}
if (inputStream != null) {
inputStream.close()
}
if (outputStream != null) {
outputStream.close()
}
}
}
}

private def writeChunk(lastChunk: Boolean): Unit = {
if (!writing) {
return
}
try {
// stop (pause) the profiler, dump the results and then resume. This is not ideal as we miss
// the events while the file is being dumped, but that is the only way to make sure that
// the chunk of data we are copying to dfs is in a consistent state.
profiler.get.execute(stopcmd)
profiler.get.execute(dumpcmd)
var remaining = inputStream.available()
if (!lastChunk) {
profiler.get.execute(resumecmd)
}
while (remaining > 0) {
val read = inputStream.read(dataBuffer, 0, math.min(remaining, UPLOAD_SIZE))
outputStream.write(dataBuffer, 0, read)
remaining -= read
}
} catch {
case e: IOException => logError("Exception occurred while writing some profiler output: ", e)
case e @ (_: IllegalArgumentException | _: IllegalStateException) =>
logError("Some profiler output not written." +
" Exception occurred in profiler native code: ", e)
case e: Exception => logError("Some profiler output not written. Unexpected exception: ", e)
}
}

private def finishWriting(): Unit = {
if (profilerDfsDir.isDefined && writing) {
try {
// shutdown background writer
threadpool.shutdown()
threadpool.awaitTermination(30, TimeUnit.SECONDS)
// flush remaining data
writeChunk(true)
inputStream.close()
outputStream.close()
} catch {
case _: InterruptedException => Thread.currentThread().interrupt()
case e: IOException =>
logWarning("Some profiling output not written." +
"Exception occurred while completing profiler output", e)
}
writing = false
}
}
}
Loading

0 comments on commit fb9c89a

Please sign in to comment.