Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true #50209

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool(
}

def reset(): Unit = synchronized {
scheduled.foreach(_.cancel(true))
scheduled.foreach(_.cancel(false))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      val future = executorService.scheduleAtFixedRate(() => {
        Utils.tryLogNonFatalError(removeIdleFetchedData())
      }, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS)
      Some(future)

There is no method called in the task that will throw InterruptdEException.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this though? if it were interruptible, we'd want to interrupt for sure.
What is the issue if you try to interrupt something and it doesn't do anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tasks can't response interrupt even if we want to interrupt them. cancel(true) causes thread interrupt, but it will not affect these tasks that can't response interrupt. In addition, interrupting operations have extra overhead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If true, then this change doesn't do anything, right? I wonder what problem this change is solving.

Copy link
Contributor Author

@beliefer beliefer Mar 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR want avoid the overhead of the invalid thread interruption.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there overhead? it just sets the interrupted status of the thread.
I'm worried that, if the code changed in some way that would make it interruptible, then we lose the ability to interrupt on cancel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One hand, I want avoid the overhead of the invalid thread interruption, even if the overhead is not big enough.
On the other hand, I suggest the reasonable use of interrupts: use interrupt mechanisms only when needed to avoid excessive dependence.


cache.clear()
numTotalElements.reset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)

override def onStop(): Unit = {
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel(true)
timeoutCheckingTask.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(
  () => Utils.tryLogNonFatalError { Option(self).foreach(_.ask[Boolean](ExpireDeadHosts)) },
  0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)

There is no method called in the task that will throw InterruptdEException.

}
eventLoopThread.shutdownNow()
killExecutorThread.shutdownNow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ private[spark] class StandaloneAppClient(

override def onStop(): Unit = {
if (registrationRetryTimer.get != null) {
registrationRetryTimer.get.cancel(true)
registrationRetryTimer.get.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))

There is no method called in the task that will throw InterruptdEException.

}
registrationRetryThread.shutdownNow()
registerMasterFutures.get.foreach(_.cancel(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ private[deploy] class Master(
applicationMetricsSystem.report()
// prevent the CompleteRecovery message sending to restarted master
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel(true)
recoveryCompletionTask.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

          recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(CompleteRecovery)
            }
          }, recoveryTimeoutMs, TimeUnit.MILLISECONDS)

There is no method called in the task that will throw InterruptdEException.

}
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
checkForWorkerTimeOutTask.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
      () => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
      0, workerTimeoutMs, TimeUnit.MILLISECONDS)

There is no method called in the task that will throw InterruptdEException.

}
forwardMessageThread.shutdownNow()
webUi.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ private[deploy] class Worker(
// We have exceeded the initial registration retry threshold
// All retries from now on should use a higher interval
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer.foreach(_.cancel(false))
registrationRetryTimer = Some(
Copy link
Contributor Author

@beliefer beliefer Mar 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

          registrationRetryTimer = Some(
            forwardMessageScheduler.scheduleAtFixedRate(
              () => Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) },
              PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
              PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
              TimeUnit.SECONDS))

ditto

forwardMessageScheduler.scheduleAtFixedRate(
() => Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) },
Expand All @@ -426,7 +426,7 @@ private[deploy] class Worker(
registerMasterFutures.foreach(_.cancel(true))
registerMasterFutures = null
}
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer.foreach(_.cancel(false))
registrationRetryTimer = None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private[netty] class NettyRpcEnv(
}
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
promise.future.onComplete { v =>
timeoutCancelable.cancel(true)
timeoutCancelable.cancel(false)
}(ThreadUtils.sameThread)
} catch {
case NonFatal(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ExecutorPodsPollingSnapshotSource(
@Since("3.1.3")
def stop(): Unit = {
if (pollingFuture != null) {
pollingFuture.cancel(true)
pollingFuture.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

pollingFuture = null
}
ThreadUtils.shutdown(pollingExecutor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ trait BroadcastExchangeLike extends Exchange {
case Some(r) => sparkContext.cancelJobsWithTag(this.jobTag, r)
case None => sparkContext.cancelJobsWithTag(this.jobTag)
}
this.relationFuture.cancel(true)
this.relationFuture.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

Expand Down Expand Up @@ -257,7 +257,7 @@ case class BroadcastExchangeExec(
logError(log"Could not execute broadcast in ${MDC(TIMEOUT, timeout)} secs.", ex)
if (!relationFuture.isDone) {
sparkContext.cancelJobsWithTag(jobTag, "The corresponding broadcast query has failed.")
relationFuture.cancel(true)
relationFuture.cancel(false)
}
throw QueryExecutionErrors.executeBroadcastTimeoutError(timeout, Some(ex))
}
Expand Down