Skip to content

Commit

Permalink
[Bug] Long tail tasks in the Write Stage retry phase results in data …
Browse files Browse the repository at this point in the history
…loss.
  • Loading branch information
yl09099 committed Dec 19, 2024
1 parent 0481f21 commit 0c3ccff
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.spark.MapOutputTracker;
import org.apache.spark.MapOutputTrackerMaster;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkException;
import org.apache.spark.shuffle.RssShuffleHandle;
Expand Down Expand Up @@ -1131,4 +1132,8 @@ public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
public ShuffleWriteClient getShuffleWriteClient() {
return shuffleWriteClient;
}

public void killAllTaskByStageId(int stageAttemptId, String cancelReason) {
SparkContext.getOrCreate().cancelStage(stageAttemptId, cancelReason);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,12 @@ MutableShuffleHandleInfo reassignOnBlockSendFailure(
* @return
*/
ShuffleWriteClient getShuffleWriteClient();

/**
* To cancel all tasks under current stage by stageId.
*
* @param stageAttemptId
* @param cancelReason
*/
void killAllTaskByStageId(int stageAttemptId, String cancelReason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,15 @@ public void reportShuffleWriteFailure(
shuffleManager.getMaxFetchFailures());
if (!shuffleServerWriterFailureRecord.isClearedMapTrackerBlock()) {
try {
// Cancel all tasks under the current StageId.
shuffleManager.killAllTaskByStageId(
stageAttemptId, "Write failure triggers server reallocation.");
// Clear the metadata of the completed task, otherwise some of the stage's data will
// be lost.
shuffleManager.unregisterAllMapOutput(shuffleId);
// Need to clear the mapStatus twice to prevent partition data loss due to the
// long-tail task performed before the stage retry.
shuffleManager.unregisterAllMapOutput(shuffleId);
// Deregister the shuffleId corresponding to the Shuffle Server.
shuffleManager.getShuffleWriteClient().unregisterShuffle(appId, shuffleId);
shuffleServerWriterFailureRecord.setClearedMapTrackerBlock(true);
Expand Down

0 comments on commit 0c3ccff

Please sign in to comment.