Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true #50209

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

beliefer
Copy link
Contributor

@beliefer beliefer commented Mar 7, 2025

What changes were proposed in this pull request?

This PR proposes to fix bug that cancel Future specified mayInterruptIfRunning with true.

Why are the changes needed?

Currently, Spark holds the Future and then cancel them with mayInterruptIfRunning = true in many places.
mayInterruptIfRunning requires the Runnable could response the thread interrupt. There are many places use it in a wrong way.
The correct usage show below.

val future = threadPool.submit(new Runnable {
  override def run(): Unit = try {
    // do something that response the thread interrupt or probably throws InterruptedException.
  } catch {
    case ie: InterruptedException =>
      Thread.currentThread().interrupt()
  }
})

future.cancel(true)

There are two correct example show below.
First, the task support interruption.

Future<?> future = executor.submit(() -> {
    try {
        while (!Thread.currentThread().isInterrupted()) {
            // do something.
        }
    } catch (InterruptedException e) {
        log("Task interrupted, exiting...");
        Thread.currentThread().interrupt(); // restore interrupt state
    }
});

future.cancel(true);

Second, the task body contains operation that could response interrupt.

Future<?> future = executor.submit(() -> {
    try {
        // do something.
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();
        String data = queue.take(); // The block operation supports response interrupt.
        // do something.
    } catch (InterruptedException e) {
        log("Task interrupted, exiting...");
        Thread.currentThread().interrupt(); // restore interrupt state
    }
});

future.cancel(true);

Does this PR introduce any user-facing change?

'No'.

How was this patch tested?

GA

Was this patch authored or co-authored using generative AI tooling?

'No'.

@beliefer beliefer changed the title [WIP][SPARK-51436][SQL] Change the mayInterruptIfRunning from true to false [WIP][SPARK-51436][CORE][SQL][K8s][SS] Change the mayInterruptIfRunning from true to false Mar 7, 2025
@beliefer beliefer changed the title [WIP][SPARK-51436][CORE][SQL][K8s][SS] Change the mayInterruptIfRunning from true to false [WIP][SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future which specify mayInterruptIfRunning with true Mar 7, 2025
@beliefer beliefer changed the title [WIP][SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future which specify mayInterruptIfRunning with true [WIP][SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true Mar 7, 2025
@@ -250,7 +250,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)

override def onStop(): Unit = {
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel(true)
timeoutCheckingTask.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(
  () => Utils.tryLogNonFatalError { Option(self).foreach(_.ask[Boolean](ExpireDeadHosts)) },
  0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)

There is no method called in the task that will throw InterruptdEException.

@@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool(
}

def reset(): Unit = synchronized {
scheduled.foreach(_.cancel(true))
scheduled.foreach(_.cancel(false))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      val future = executorService.scheduleAtFixedRate(() => {
        Utils.tryLogNonFatalError(removeIdleFetchedData())
      }, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS)
      Some(future)

There is no method called in the task that will throw InterruptdEException.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this though? if it were interruptible, we'd want to interrupt for sure.
What is the issue if you try to interrupt something and it doesn't do anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tasks can't response interrupt even if we want to interrupt them. cancel(true) causes thread interrupt, but it will not affect these tasks that can't response interrupt. In addition, interrupting operations have extra overhead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If true, then this change doesn't do anything, right? I wonder what problem this change is solving.

Copy link
Contributor Author

@beliefer beliefer Mar 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR want avoid the overhead of the invalid thread interruption.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there overhead? it just sets the interrupted status of the thread.
I'm worried that, if the code changed in some way that would make it interruptible, then we lose the ability to interrupt on cancel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One hand, I want avoid the overhead of the invalid thread interruption, even if the overhead is not big enough.
On the other hand, I suggest the reasonable use of interrupts: use interrupt mechanisms only when needed to avoid excessive dependence.

@@ -277,7 +277,7 @@ private[spark] class StandaloneAppClient(

override def onStop(): Unit = {
if (registrationRetryTimer.get != null) {
registrationRetryTimer.get.cancel(true)
registrationRetryTimer.get.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))

There is no method called in the task that will throw InterruptdEException.

@@ -214,10 +214,10 @@ private[deploy] class Master(
applicationMetricsSystem.report()
// prevent the CompleteRecovery message sending to restarted master
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel(true)
recoveryCompletionTask.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

          recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(CompleteRecovery)
            }
          }, recoveryTimeoutMs, TimeUnit.MILLISECONDS)

There is no method called in the task that will throw InterruptdEException.

}
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
checkForWorkerTimeOutTask.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
      () => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
      0, workerTimeoutMs, TimeUnit.MILLISECONDS)

There is no method called in the task that will throw InterruptdEException.

@@ -403,7 +403,7 @@ private[deploy] class Worker(
// We have exceeded the initial registration retry threshold
// All retries from now on should use a higher interval
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer.foreach(_.cancel(false))
registrationRetryTimer = Some(
Copy link
Contributor Author

@beliefer beliefer Mar 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

          registrationRetryTimer = Some(
            forwardMessageScheduler.scheduleAtFixedRate(
              () => Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) },
              PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
              PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
              TimeUnit.SECONDS))

ditto

@@ -63,7 +63,7 @@ class ExecutorPodsPollingSnapshotSource(
@Since("3.1.3")
def stop(): Unit = {
if (pollingFuture != null) {
pollingFuture.cancel(true)
pollingFuture.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

@@ -108,7 +108,7 @@ trait BroadcastExchangeLike extends Exchange {
case Some(r) => sparkContext.cancelJobsWithTag(this.jobTag, r)
case None => sparkContext.cancelJobsWithTag(this.jobTag)
}
this.relationFuture.cancel(true)
this.relationFuture.cancel(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@beliefer beliefer force-pushed the SPARK-51436 branch 3 times, most recently from bf2523c to 4f26f76 Compare March 8, 2025 09:21
@beliefer beliefer changed the title [WIP][SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true [SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true Mar 8, 2025
@beliefer
Copy link
Contributor Author

beliefer commented Mar 8, 2025

ping @srowen @dongjoon-hyun @LuciferYang

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants