Skip to content

[SPARK-52006][SQL][CORE] Exclude CollectMetricsExec accumulator from Spark UI + event logs + metric heartbeats #50812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

ivoson
Copy link
Contributor

@ivoson ivoson commented May 7, 2025

What changes were proposed in this pull request?

CollectMetricsExec uses an AggregatingAccumulator for emitting metrics in a side-channel that can be collected to the driver. And there are a few issues with this:

  • It is a named accumulator but not an internal accumulator, so its result is shown in the Spark UI. But that result isn’t meaningful to users because it’s the .toString representation of an UnsafeRow representing an aggregation buffer.
  • This same string representation is also logged in event logs on a per-task basis which is not necessary.
  • Last and the most important is that there could be a race condition issue serializing the accumulator between task result serialization and executor heartbeat (heartbeat serialize it firstly and then task result serialization happens) which would lead to task failures since TypedImperativeAggregate.serializeAggregateBufferInPlace is not idempotent.
    Error Stack

To fix the issues, below changes are made in this PR:

  • Mark the accumulator as internal metrics to exclude it from Spark UI.
  • JsonProtocol to explicitly exclude this accumulator from event logs.
  • Executor to explicitly exclude it from heartbeats.

Why are the changes needed?

Fixing the race condition issues and remove unnecessary information from Spark UI and event log.

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT added.
Manually tested with below repro in standalone cluster since the issue can't be triggered with local mode:

import org.apache.spark.TaskContext

val df = spark.range(100)
  .mapPartitions { iter =>
    TaskContext.get().addTaskCompletionListener[Unit] { _ =>
      Thread.sleep(30000L)
    }
    iter
  }.toDF("id")
  .observe(
    name = "my_event",
    max($"id").as("max_val"),
    percentile_approx($"id", lit(0.5), lit(100)),
    percentile_approx($"id", lit(0.5), lit(100)),
    min($"id").as("min_val"))

df.collect()

Was this patch authored or co-authored using generative AI tooling?

No

@ivoson
Copy link
Contributor Author

ivoson commented May 7, 2025

cc @cloud-fan can you please take a look? Thanks.

@cloud-fan
Copy link
Contributor

thanks, merging to master/4.0!

@cloud-fan cloud-fan closed this in e74c5c4 May 9, 2025
cloud-fan pushed a commit that referenced this pull request May 9, 2025
…Spark UI + event logs + metric heartbeats

### What changes were proposed in this pull request?
[CollectMetricsExec](https://github.com/apache/spark/blob/1d0e82fb59afbc8d846470b980c3e926ad91513b/sql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala#L33) uses an [AggregatingAccumulator](https://github.com/apache/spark/blob/1d0e82fb59afbc8d846470b980c3e926ad91513b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala#L33) for emitting metrics in a side-channel that can be collected to the driver. And there are a few issues with this:

- It is a named accumulator but not an internal accumulator, so its result is shown in the Spark UI. But that result isn’t meaningful to users because it’s the `.toString` representation of an UnsafeRow representing an aggregation buffer.
- This same string representation is also logged in event logs on a per-task basis which is not necessary.
- **Last and the most important** is that there could be a race condition issue serializing the accumulator between task result serialization and executor heartbeat (heartbeat serialize it firstly and then task result serialization happens) which would lead to task failures since [TypedImperativeAggregate.serializeAggregateBufferInPlace](https://github.com/apache/spark/blob/1d0e82fb59afbc8d846470b980c3e926ad91513b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L620) is not idempotent.
    <img width="1726" alt="Error Stack" src="https://github.com/user-attachments/assets/c86bc38e-8e97-495a-8dbd-9dc9e84bf6c4" />

To fix the issues, below changes are made in this PR:
- Mark the accumulator as internal metrics to exclude it from Spark UI.
- JsonProtocol to explicitly exclude this accumulator from event logs.
- Executor to explicitly exclude it from heartbeats.

### Why are the changes needed?
Fixing the race condition issues and remove unnecessary information from Spark UI and event log.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT added.
Manually tested with below repro in standalone cluster since the issue can't be triggered with local mode:
```
import org.apache.spark.TaskContext

val df = spark.range(100)
  .mapPartitions { iter =>
    TaskContext.get().addTaskCompletionListener[Unit] { _ =>
      Thread.sleep(30000L)
    }
    iter
  }.toDF("id")
  .observe(
    name = "my_event",
    max($"id").as("max_val"),
    percentile_approx($"id", lit(0.5), lit(100)),
    percentile_approx($"id", lit(0.5), lit(100)),
    min($"id").as("min_val"))

df.collect()
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #50812 from ivoson/SPARK-52006.

Authored-by: Tengfei Huang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e74c5c4)
Signed-off-by: Wenchen Fan <[email protected]>
@ivoson ivoson deleted the SPARK-52006 branch May 9, 2025 03:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants