Skip to content

Commit ece5812

Browse files
committed
rebase
1 parent 9d091c0 commit ece5812

File tree

4 files changed

+17
-47
lines changed

4 files changed

+17
-47
lines changed

core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,7 @@ private[spark] class JobWaiter[T](
5151
* all the tasks belonging to this job, it will fail this job with a SparkException.
5252
*/
5353
def cancel(reason: Option[String] = None, quiet: Boolean = false): Unit = {
54-
if (quiet) {
55-
dagScheduler.cancelJob(jobId, reason, quiet)
56-
} else {
57-
dagScheduler.cancelJob(jobId, None, quiet)
58-
}
54+
dagScheduler.cancelJob(jobId, reason, quiet)
5955
}
6056

6157
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ case class AdaptiveSparkPlanExec(
384384
}
385385
})
386386
}
387+
stagesToCancel.clear()
387388
}
388389
}
389390
// Now that some stages have finished, we can try creating new stages.

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,14 @@ trait ShuffleExchangeLike extends Exchange {
7878
private[sql] // Exposed for testing
7979
val futureAction = new AtomicReference[Option[FutureAction[MapOutputStatistics]]](None)
8080

81+
@volatile
8182
@transient
8283
private var isCancelled: Boolean = false
8384

85+
@volatile
86+
@transient
87+
private var quietly: Boolean = false
88+
8489
@transient
8590
private lazy val triggerFuture: java.util.concurrent.Future[Any] = {
8691
SQLExecution.withThreadLocalCaptured(session, ShuffleExchangeExec.executionContext) {
@@ -90,7 +95,7 @@ trait ShuffleExchangeLike extends Exchange {
9095
executeQuery(null)
9196
// Submit shuffle job if not cancelled.
9297
this.synchronized {
93-
if (isCancelled) {
98+
if (isCancelled && !quietly) {
9499
promise.tryFailure(new SparkException("Shuffle cancelled."))
95100
} else {
96101
val shuffleJob = RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
@@ -125,9 +130,15 @@ trait ShuffleExchangeLike extends Exchange {
125130
* Cancels the shuffle job with an optional reason.
126131
*/
127132
final def cancelShuffleJob(reason: Option[String], quiet: Boolean): Unit = this.synchronized {
128-
if (!isCancelled) {
129-
isCancelled = true
130-
futureAction.get().foreach(_.cancel(reason, quiet))
133+
this.synchronized {
134+
if (!isCancelled) {
135+
isCancelled = true
136+
if (quiet) {
137+
quietly = quiet
138+
promise.tryFailure(new SparkAQEStageCancelException)
139+
}
140+
futureAction.get().foreach(_.cancel(reason, quiet))
141+
}
131142
}
132143
}
133144

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -255,44 +255,6 @@ class AdaptiveQueryExecSuite
255255
}
256256
}
257257

258-
test("SPARK-52024: Support AQE Cancel Empty") {
259-
// make the non-empty side compute more time
260-
// so that empty side complete first to trigger the rule
261-
spark.udf.register("fake_udf", (input: Int) => {
262-
Thread.sleep(20)
263-
input
264-
})
265-
var withCancelEmpty = -1L
266-
var withoutCancelEmpty = -1L
267-
Seq(true, false).foreach { enable =>
268-
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
269-
SQLConf.ADAPTIVE_EMPTY_TRIGGER_CANCEL_ENABLED.key -> String.valueOf(enable)) {
270-
val query =
271-
"""
272-
|SELECT t.key1
273-
|FROM emptyTestData join (SELECT testData.key as key1
274-
|FROM testData join testData2 ON fake_udf(testData.key)=fake_udf(testData2.a) ) t
275-
|on t.key1 = emptyTestData.key
276-
|union
277-
|SELECT testData.key
278-
|FROM testData join testData2 ON testData.key=testData2.a
279-
|""".stripMargin
280-
runAdaptiveAndVerifyResult(query)
281-
282-
val start = System.currentTimeMillis()
283-
sql(query).collect()
284-
val end = System.currentTimeMillis()
285-
if (enable) {
286-
withCancelEmpty = end - start
287-
} else {
288-
withoutCancelEmpty = end - start
289-
}
290-
}
291-
}
292-
assert(withCancelEmpty * 2 < withoutCancelEmpty, "withCancelEmpty should be less")
293-
}
294-
295-
296258
test("Reuse the parallelism of coalesced shuffle in local shuffle read") {
297259
withSQLConf(
298260
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",

0 commit comments

Comments
 (0)