Skip to content

Commit 1dab449

Browse files
Karuppayya Rajendrankaruppayya
authored andcommitted
[SPARK-53413][SQL] Shuffle cleanup for commands
### What changes were proposed in this pull request? Changes to cleanup shuffle generated from running commands(eg writes) This was also brought by cloud-fan and ulysses-you [here](#45930 (comment)) ### Why are the changes needed? To cleanupshuffle generated from commands ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test added ### Was this patch authored or co-authored using generative AI tooling? No Closes #52157 from karuppayya/SPARK-53413. Lead-authored-by: Karuppayya Rajendran <[email protected]> Co-authored-by: Karuppayya <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 07d987a commit 1dab449

File tree

4 files changed

+59
-10
lines changed

4 files changed

+59
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3609,7 +3609,8 @@ object SQLConf {
36093609
val CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED =
36103610
buildConf("spark.sql.classic.shuffleDependency.fileCleanup.enabled")
36113611
.doc("When enabled, shuffle files will be cleaned up at the end of classic " +
3612-
"SQL executions.")
3612+
"SQL executions. Note that this cleanup may cause stage retries and regenerate " +
3613+
"shuffle files if the same dataframe reference is executed again.")
36133614
.version("4.1.0")
36143615
.booleanConf
36153616
.createWithDefault(Utils.isTesting)

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ class QueryExecution(
150150
// with the rest of processing of the root plan being just outputting command results,
151151
// for eagerly executed commands we mark this place as beginning of execution.
152152
tracker.setReadyForExecution()
153-
val qe = sparkSession.sessionState.executePlan(p, mode)
153+
val qe = new QueryExecution(sparkSession, p, mode = mode,
154+
shuffleCleanupMode = shuffleCleanupMode)
154155
val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") {
155156
SQLExecution.withNewExecutionId(qe, Some(name)) {
156157
qe.executedPlan.executeCollect()

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PRE
3030
import org.apache.spark.internal.config.Tests.IS_TESTING
3131
import org.apache.spark.sql.classic.SparkSession
3232
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
33+
import org.apache.spark.sql.execution.command.DataWritingCommandExec
34+
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
3335
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
3436
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
3537
import org.apache.spark.sql.internal.SQLConf
@@ -68,6 +70,17 @@ object SQLExecution extends Logging {
6870
}
6971
}
7072

73+
private def extractShuffleIds(plan: SparkPlan): Seq[Int] = {
74+
plan match {
75+
case ae: AdaptiveSparkPlanExec =>
76+
ae.context.shuffleIds.asScala.keys.toSeq
77+
case nonAdaptivePlan =>
78+
nonAdaptivePlan.collect {
79+
case exec: ShuffleExchangeLike => exec.shuffleId
80+
}
81+
}
82+
}
83+
7184
/**
7285
* Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that
7386
* we can connect them with an execution.
@@ -177,13 +190,12 @@ object SQLExecution extends Logging {
177190
if (queryExecution.shuffleCleanupMode != DoNotCleanup
178191
&& isExecutedPlanAvailable) {
179192
val shuffleIds = queryExecution.executedPlan match {
180-
case ae: AdaptiveSparkPlanExec =>
181-
ae.context.shuffleIds.asScala.keys
182-
case nonAdaptivePlan =>
183-
nonAdaptivePlan.collect {
184-
case exec: ShuffleExchangeLike =>
185-
exec.shuffleId
186-
}
193+
case command: V2CommandExec =>
194+
command.children.flatMap(extractShuffleIds)
195+
case dataWritingCommand: DataWritingCommandExec =>
196+
extractShuffleIds(dataWritingCommand.child)
197+
case plan =>
198+
extractShuffleIds(plan)
187199
}
188200
shuffleIds.foreach { shuffleId =>
189201
queryExecution.shuffleCleanupMode match {

sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import scala.collection.mutable
2020
import scala.io.Source
2121
import scala.util.Try
2222

23-
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, FastOperator}
23+
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, FastOperator, SaveMode}
2424
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, QueryPlanningTrackerCallback, TableIdentifier}
2525
import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace, UnresolvedFunction, UnresolvedRelation}
2626
import org.apache.spark.sql.catalyst.expressions.{Alias, UnsafeRow}
@@ -327,6 +327,41 @@ class QueryExecutionSuite extends SharedSparkSession {
327327
}
328328
}
329329

330+
test("SPARK-53413: Cleanup shuffle dependencies for commands") {
331+
Seq(true, false).foreach { adaptiveEnabled => {
332+
withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString),
333+
(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, true.toString)) {
334+
val plan = spark.range(100).repartition(10).logicalPlan
335+
val df = Dataset.ofRows(spark, plan)
336+
df.write.format("noop").mode(SaveMode.Overwrite).save()
337+
338+
val blockManager = spark.sparkContext.env.blockManager
339+
assert(blockManager.migratableResolver.getStoredShuffles().isEmpty)
340+
assert(blockManager.diskBlockManager.getAllBlocks().isEmpty)
341+
}
342+
}
343+
}
344+
}
345+
346+
test("SPARK-53413: Cleanup shuffle dependencies for DataWritingCommandExec") {
347+
withTempDir { dir =>
348+
Seq(true, false).foreach { adaptiveEnabled => {
349+
withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString),
350+
(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key, true.toString)) {
351+
val plan = spark.range(100).repartition(10).logicalPlan
352+
val df = Dataset.ofRows(spark, plan)
353+
// V1 API write
354+
df.write.format("csv").mode(SaveMode.Overwrite).save(dir.getCanonicalPath)
355+
356+
val blockManager = spark.sparkContext.env.blockManager
357+
assert(blockManager.migratableResolver.getStoredShuffles().isEmpty)
358+
assert(blockManager.diskBlockManager.getAllBlocks().isEmpty)
359+
}
360+
}
361+
}
362+
}
363+
}
364+
330365
test("SPARK-47764: Cleanup shuffle dependencies - DoNotCleanup mode") {
331366
Seq(true, false).foreach { adaptiveEnabled => {
332367
withSQLConf((SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, adaptiveEnabled.toString)) {

0 commit comments

Comments
 (0)