From 0c3ccff3b2b5b3bbe025a79ef785062f898ebe8e Mon Sep 17 00:00:00 2001 From: yl09099 Date: Thu, 19 Dec 2024 16:52:05 +0800 Subject: [PATCH] [Bug] Long tail tasks in the Write Stage retry phase results in data loss. --- .../uniffle/shuffle/manager/RssShuffleManagerBase.java | 5 +++++ .../shuffle/manager/RssShuffleManagerInterface.java | 8 ++++++++ .../shuffle/manager/ShuffleManagerGrpcService.java | 6 ++++++ 3 files changed, 19 insertions(+) diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index c1fc5b68eb..b6e806d0e6 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -45,6 +45,7 @@ import org.apache.spark.MapOutputTracker; import org.apache.spark.MapOutputTrackerMaster; import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; import org.apache.spark.SparkEnv; import org.apache.spark.SparkException; import org.apache.spark.shuffle.RssShuffleHandle; @@ -1131,4 +1132,8 @@ public Map sparkConfToMap(SparkConf sparkConf) { public ShuffleWriteClient getShuffleWriteClient() { return shuffleWriteClient; } + + public void killAllTaskByStageId(int stageAttemptId, String cancelReason) { + SparkContext.getOrCreate().cancelStage(stageAttemptId, cancelReason); + } } diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java index 82967cc4a3..faac95ae21 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java @@ -96,4 +96,12 @@ MutableShuffleHandleInfo reassignOnBlockSendFailure( * @return */ ShuffleWriteClient getShuffleWriteClient(); + + /** + * To cancel all tasks under current stage by stageId. + * + * @param stageAttemptId + * @param cancelReason + */ + void killAllTaskByStageId(int stageAttemptId, String cancelReason); } diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java index 667b6a9056..66a6400ced 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java @@ -115,9 +115,15 @@ public void reportShuffleWriteFailure( shuffleManager.getMaxFetchFailures()); if (!shuffleServerWriterFailureRecord.isClearedMapTrackerBlock()) { try { + // Cancel all tasks under the current StageId. + shuffleManager.killAllTaskByStageId( + stageAttemptId, "Write failure triggers server reallocation."); // Clear the metadata of the completed task, otherwise some of the stage's data will // be lost. shuffleManager.unregisterAllMapOutput(shuffleId); + // Need to clear the mapStatus twice to prevent partition data loss due to the + // long-tail task performed before the stage retry. + shuffleManager.unregisterAllMapOutput(shuffleId); // Deregister the shuffleId corresponding to the Shuffle Server. shuffleManager.getShuffleWriteClient().unregisterShuffle(appId, shuffleId); shuffleServerWriterFailureRecord.setClearedMapTrackerBlock(true);