Skip to content

Commit

Permalink
[SPARK-48611][CORE] Log TID for input split in HadoopRDD and NewHadoo…
Browse files Browse the repository at this point in the history
…pRDD

### What changes were proposed in this pull request?

Log `TID` for "input split" in `HadoopRDD` and `NewHadoopRDD`

### Why are the changes needed?

This change should benefit both structured logging enabled/disabled cases.

When structured logging is disabled, and executor cores > 1, the logs of tasks are mixed in stdout, something like

```
24/06/12 21:40:10 INFO Executor: Running task 26.0 in stage 2.0 (TID 10)
24/06/12 21:40:10 INFO Executor: Running task 27.0 in stage 2.0 (TID 11)
24/06/12 21:40:11 INFO HadoopRDD: Input split: hdfs://.../part-00025-53bc40ae-399f-4291-b5ac-617c980deb86-c000:0+124138257
24/06/12 21:40:11 INFO HadoopRDD: Input split: hdfs://.../part-00045-53bc40ae-399f-4291-b5ac-617c980deb86-c000:0+121726684
```
it's hard to say which file is read by which task because they run in parallel.

If something goes wrong, the log prints `TID` and exception stack trace, the error may related to the input data, sometimes that `exception message` is clear enough to show which file that input data comes from, but sometimes not, in the latter case, the current log is not clear enough to allow us to identify the bad file quickly.

```
24/06/12 21:40:18 ERROR Executor: Exception in task 27.0 in stage 2.0 (TID 11)
(... exception message)
(... stacktraces)
```

When structured logging is enabled, exposing TID as a LogKey makes the logs more selective.

### Does this PR introduce _any_ user-facing change?

Yes, it supplies additional information in logs.

### How was this patch tested?

Review, as it only touches log contents.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46966 from pan3793/SPARK-48611.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
pan3793 authored and gengliangwang committed Jun 15, 2024
1 parent 8ee8aba commit 0775ea7
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ class HadoopRDD[K, V](
val iter = new NextIterator[(K, V)] {

private val split = theSplit.asInstanceOf[HadoopPartition]
logInfo(log"Input split: ${MDC(INPUT_SPLIT, split.inputSplit)}")
logInfo(log"Task (TID ${MDC(TASK_ID, context.taskAttemptId())}) input split: " +
log"${MDC(INPUT_SPLIT, split.inputSplit)}")
private val jobConf = getJobConf()

private val inputMetrics = context.taskMetrics().inputMetrics
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ class NewHadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new Iterator[(K, V)] {
private val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo(log"Input split: ${MDC(INPUT_SPLIT, split.serializableHadoopSplit)}")
logInfo(log"Task (TID ${MDC(TASK_ID, context.taskAttemptId())}) input split: " +
log"${MDC(INPUT_SPLIT, split.serializableHadoopSplit)}")
private val conf = getConf

private val inputMetrics = context.taskMetrics().inputMetrics
Expand Down

0 comments on commit 0775ea7

Please sign in to comment.