From 0775ea732a38c668b8727f56929dad5745962f61 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 14 Jun 2024 20:21:09 -0700 Subject: [PATCH] [SPARK-48611][CORE] Log TID for input split in HadoopRDD and NewHadoopRDD ### What changes were proposed in this pull request? Log `TID` for "input split" in `HadoopRDD` and `NewHadoopRDD` ### Why are the changes needed? This change should benefit both structured logging enabled/disabled cases. When structured logging is disabled, and executor cores > 1, the logs of tasks are mixed in stdout, something like ``` 24/06/12 21:40:10 INFO Executor: Running task 26.0 in stage 2.0 (TID 10) 24/06/12 21:40:10 INFO Executor: Running task 27.0 in stage 2.0 (TID 11) 24/06/12 21:40:11 INFO HadoopRDD: Input split: hdfs://.../part-00025-53bc40ae-399f-4291-b5ac-617c980deb86-c000:0+124138257 24/06/12 21:40:11 INFO HadoopRDD: Input split: hdfs://.../part-00045-53bc40ae-399f-4291-b5ac-617c980deb86-c000:0+121726684 ``` it's hard to say which file is read by which task because they run in parallel. If something goes wrong, the log prints `TID` and exception stack trace, the error may related to the input data, sometimes that `exception message` is clear enough to show which file that input data comes from, but sometimes not, in the latter case, the current log is not clear enough to allow us to identify the bad file quickly. ``` 24/06/12 21:40:18 ERROR Executor: Exception in task 27.0 in stage 2.0 (TID 11) (... exception message) (... stacktraces) ``` When structured logging is enabled, exposing TID as a LogKey makes the logs more selective. ### Does this PR introduce _any_ user-facing change? Yes, it supplies additional information in logs. ### How was this patch tested? Review, as it only touches log contents. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46966 from pan3793/SPARK-48611. Authored-by: Cheng Pan Signed-off-by: Gengliang Wang --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 3 ++- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index cbfce378879ec..545eafe7a4449 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -270,7 +270,8 @@ class HadoopRDD[K, V]( val iter = new NextIterator[(K, V)] { private val split = theSplit.asInstanceOf[HadoopPartition] - logInfo(log"Input split: ${MDC(INPUT_SPLIT, split.inputSplit)}") + logInfo(log"Task (TID ${MDC(TASK_ID, context.taskAttemptId())}) input split: " + + log"${MDC(INPUT_SPLIT, split.inputSplit)}") private val jobConf = getJobConf() private val inputMetrics = context.taskMetrics().inputMetrics diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 3a1ce4bd1dfde..bf539320b5985 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -197,7 +197,8 @@ class NewHadoopRDD[K, V]( override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new Iterator[(K, V)] { private val split = theSplit.asInstanceOf[NewHadoopPartition] - logInfo(log"Input split: ${MDC(INPUT_SPLIT, split.serializableHadoopSplit)}") + logInfo(log"Task (TID ${MDC(TASK_ID, context.taskAttemptId())}) input split: " + + log"${MDC(INPUT_SPLIT, split.serializableHadoopSplit)}") private val conf = getConf private val inputMetrics = context.taskMetrics().inputMetrics