Skip to content

Commit

Permalink
[SPARK-51436][SQL] Change the mayInterruptIfRunning from true to false
Browse files Browse the repository at this point in the history
  • Loading branch information
beliefer committed Mar 8, 2025
1 parent c0ec84e commit a6dbeae
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool(
}

def reset(): Unit = synchronized {
scheduled.foreach(_.cancel(true))
scheduled.foreach(_.cancel(false))

cache.clear()
numTotalElements.reset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)

override def onStop(): Unit = {
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel(true)
timeoutCheckingTask.cancel(false)
}
eventLoopThread.shutdownNow()
killExecutorThread.shutdownNow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private[spark] class StandaloneAppClient(

override def onStop(): Unit = {
if (registrationRetryTimer.get != null) {
registrationRetryTimer.get.cancel(true)
registrationRetryTimer.get.cancel(false)
}
registrationRetryThread.shutdownNow()
registerMasterFutures.get.foreach(_.cancel(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ private[deploy] class Master(
applicationMetricsSystem.report()
// prevent the CompleteRecovery message sending to restarted master
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel(true)
recoveryCompletionTask.cancel(false)
}
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
checkForWorkerTimeOutTask.cancel(false)
}
forwardMessageThread.shutdownNow()
webUi.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ private[deploy] class Worker(
// We have exceeded the initial registration retry threshold
// All retries from now on should use a higher interval
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer.foreach(_.cancel(false))
registrationRetryTimer = Some(
forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) },
Expand All @@ -426,7 +426,7 @@ private[deploy] class Worker(
registerMasterFutures.foreach(_.cancel(true))
registerMasterFutures = null
}
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer.foreach(_.cancel(false))
registrationRetryTimer = None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private[netty] class NettyRpcEnv(
}
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
promise.future.onComplete { v =>
timeoutCancelable.cancel(true)
timeoutCancelable.cancel(false)
}(ThreadUtils.sameThread)
} catch {
case NonFatal(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ExecutorPodsPollingSnapshotSource(
@Since("3.1.3")
def stop(): Unit = {
if (pollingFuture != null) {
pollingFuture.cancel(true)
pollingFuture.cancel(false)
pollingFuture = null
}
ThreadUtils.shutdown(pollingExecutor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ trait BroadcastExchangeLike extends Exchange {
case Some(r) => sparkContext.cancelJobsWithTag(this.jobTag, r)
case None => sparkContext.cancelJobsWithTag(this.jobTag)
}
this.relationFuture.cancel(true)
this.relationFuture.cancel(false)
}
}

Expand Down Expand Up @@ -257,7 +257,7 @@ case class BroadcastExchangeExec(
logError(log"Could not execute broadcast in ${MDC(TIMEOUT, timeout)} secs.", ex)
if (!relationFuture.isDone) {
sparkContext.cancelJobsWithTag(jobTag, "The corresponding broadcast query has failed.")
relationFuture.cancel(true)
relationFuture.cancel(false)
}
throw QueryExecutionErrors.executeBroadcastTimeoutError(timeout, Some(ex))
}
Expand Down

0 comments on commit a6dbeae

Please sign in to comment.