Skip to content

Commit

Permalink
[SPARK-51072][CORE] CallerContext to set Hadoop cloud audit context
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

When enabled, cloud store client audit context is set to the
same context string as the Hadoop IPC context.

### Why are the changes needed?

CallerContext adds information about the spark task to hadoop IPC context and then to HDFS, YARN and HBase server logs.

It is also possible to update the cloud storage "audit context".
Storage clients can attach the audit information to requests to be stored in the service's own logs, where it can be retrieved, parsed and used for analysis.

It is currently supported by the S3A connector, which adds the information to a synthetic referrer header, which is then stored in the S3 Server logs. (Not cloudtrail, sadly).

See [S3A Auditing](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/auditing.html)

### Does this PR introduce _any_ user-facing change?

If enabled, it adds extra entries in cloud storage server logs through cloud
storage clients which support it.

### How was this patch tested?

Expanded existing test `"Set Spark CallerContext"` to verify
full setting of passed down parameters to caller and audit contexts.
This required extracting the functional code of `CallerContext.setCurrentContext`
into a `VisibleForTesting private[util]` method `setCurrentContext(Boolean)`

Without this, the test suite only ran if the process had been launched
with the configuration option `"hadoop.caller.context.enabled` being set
to true -this is not the default, so the existing test suite code
was probably never executed.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #49779 from steveloughran/SPARK-51072-caller-context-auditing.

Authored-by: Steve Loughran <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
steveloughran authored and dongjoon-hyun committed Mar 4, 2025
1 parent fd1f774 commit 229be37
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext}
import org.apache.hadoop.ipc.CallerContext.{Builder => HadoopCallerContextBuilder}
Expand Down Expand Up @@ -3171,6 +3172,9 @@ private[util] object CallerContext extends Logging {
* specific applications impacting parts of the Hadoop system and potential problems they may be
* creating (e.g. overloading NN). As HDFS mentioned in HDFS-9184, for a given HDFS operation, it's
* very helpful to track which upper level job issues it.
* The context information is also set in the audit context for cloud storage
* connectors. If supported, this gets marshalled as part of the HTTP Referrer header
* or similar field, and so ends up in the store service logs themselves.
*
* @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
*
Expand Down Expand Up @@ -3221,11 +3225,15 @@ private[spark] class CallerContext(

/**
* Set up the caller context [[context]] by invoking Hadoop CallerContext API of
* [[HadoopCallerContext]].
* [[HadoopCallerContext]], which is included in IPC calls,
* and the Hadoop audit context, which may be included in cloud storage
* requests.
*/
def setCurrentContext(): Unit = if (CallerContext.callerContextEnabled) {
val hdfsContext = new HadoopCallerContextBuilder(context).build()
HadoopCallerContext.setCurrent(hdfsContext)
// set the audit context for to object stores, with the prefix "spark"
currentAuditContext.put("spark", context)
}
}

Expand Down
17 changes: 14 additions & 3 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.commons.lang3.SystemUtils
import org.apache.commons.math3.stat.inference.ChiSquareTest
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext
import org.apache.hadoop.ipc.{CallerContext => HadoopCallerContext}
import org.apache.logging.log4j.Level

Expand Down Expand Up @@ -1003,9 +1004,19 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties {
}

test("Set Spark CallerContext") {
val context = "test"
new CallerContext(context).setCurrentContext()
assert(s"SPARK_$context" === HadoopCallerContext.getCurrent.toString)
currentAuditContext.reset
new CallerContext("test",
Some("upstream"),
Some("app"),
Some("attempt"),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5)).setCurrentContext()
val expected = s"SPARK_test_app_attempt_JId_1_SId_2_3_TId_4_5_upstream"
assert(expected === HadoopCallerContext.getCurrent.toString)
assert(expected === currentAuditContext.get("spark"))
}

test("encodeFileNameToURIRawPath") {
Expand Down

0 comments on commit 229be37

Please sign in to comment.