From a6dbeaeb52f05d45a7d510c17675a3d4b922b518 Mon Sep 17 00:00:00 2001 From: beliefer Date: Fri, 7 Mar 2025 19:04:38 +0800 Subject: [PATCH] [SPARK-51436][SQL] Change the mayInterruptIfRunning from true to false --- .../apache/spark/sql/kafka010/consumer/FetchedDataPool.scala | 2 +- core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala | 2 +- .../org/apache/spark/deploy/client/StandaloneAppClient.scala | 2 +- .../main/scala/org/apache/spark/deploy/master/Master.scala | 4 ++-- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++-- .../main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 2 +- .../cluster/k8s/ExecutorPodsPollingSnapshotSource.scala | 2 +- .../spark/sql/execution/exchange/BroadcastExchangeExec.scala | 4 ++-- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala index 9f68cb6fd0882..38324a8a364c1 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala @@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool( } def reset(): Unit = synchronized { - scheduled.foreach(_.cancel(true)) + scheduled.foreach(_.cancel(false)) cache.clear() numTotalElements.reset() diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 92aea5959aab7..c55af95cccf46 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -250,7 +250,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) override def onStop(): Unit = { if (timeoutCheckingTask != null) { - timeoutCheckingTask.cancel(true) + timeoutCheckingTask.cancel(false) } eventLoopThread.shutdownNow() killExecutorThread.shutdownNow() diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index b34e5c408c3be..349de24ca1ffb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -277,7 +277,7 @@ private[spark] class StandaloneAppClient( override def onStop(): Unit = { if (registrationRetryTimer.get != null) { - registrationRetryTimer.get.cancel(true) + registrationRetryTimer.get.cancel(false) } registrationRetryThread.shutdownNow() registerMasterFutures.get.foreach(_.cancel(true)) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7d15744de6b45..01365ebfefabc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -214,10 +214,10 @@ private[deploy] class Master( applicationMetricsSystem.report() // prevent the CompleteRecovery message sending to restarted master if (recoveryCompletionTask != null) { - recoveryCompletionTask.cancel(true) + recoveryCompletionTask.cancel(false) } if (checkForWorkerTimeOutTask != null) { - checkForWorkerTimeOutTask.cancel(true) + checkForWorkerTimeOutTask.cancel(false) } forwardMessageThread.shutdownNow() webUi.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index b2ec23887a400..cb4ce29adb7f4 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -403,7 +403,7 @@ private[deploy] class Worker( // We have exceeded the initial registration retry threshold // All retries from now on should use a higher interval if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { - registrationRetryTimer.foreach(_.cancel(true)) + registrationRetryTimer.foreach(_.cancel(false)) registrationRetryTimer = Some( forwardMessageScheduler.scheduleAtFixedRate( () => Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) }, @@ -426,7 +426,7 @@ private[deploy] class Worker( registerMasterFutures.foreach(_.cancel(true)) registerMasterFutures = null } - registrationRetryTimer.foreach(_.cancel(true)) + registrationRetryTimer.foreach(_.cancel(false)) registrationRetryTimer = None } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index c2688610fe8b1..73431adba3cd4 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -269,7 +269,7 @@ private[netty] class NettyRpcEnv( } }, timeout.duration.toNanos, TimeUnit.NANOSECONDS) promise.future.onComplete { v => - timeoutCancelable.cancel(true) + timeoutCancelable.cancel(false) }(ThreadUtils.sameThread) } catch { case NonFatal(e) => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index 4ed34ec3e4c00..64645106d2630 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -63,7 +63,7 @@ class ExecutorPodsPollingSnapshotSource( @Since("3.1.3") def stop(): Unit = { if (pollingFuture != null) { - pollingFuture.cancel(true) + pollingFuture.cancel(false) pollingFuture = null } ThreadUtils.shutdown(pollingExecutor) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 2565a14cef90b..91c05c3f0919a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -108,7 +108,7 @@ trait BroadcastExchangeLike extends Exchange { case Some(r) => sparkContext.cancelJobsWithTag(this.jobTag, r) case None => sparkContext.cancelJobsWithTag(this.jobTag) } - this.relationFuture.cancel(true) + this.relationFuture.cancel(false) } } @@ -257,7 +257,7 @@ case class BroadcastExchangeExec( logError(log"Could not execute broadcast in ${MDC(TIMEOUT, timeout)} secs.", ex) if (!relationFuture.isDone) { sparkContext.cancelJobsWithTag(jobTag, "The corresponding broadcast query has failed.") - relationFuture.cancel(true) + relationFuture.cancel(false) } throw QueryExecutionErrors.executeBroadcastTimeoutError(timeout, Some(ex)) }