Skip to content

Commit

Permalink
[apache#1796] fix(spark): Implicitly unregister map output on fetch f…
Browse files Browse the repository at this point in the history
…ailure
  • Loading branch information
zuston committed Jun 17, 2024
1 parent 8eff38e commit c9777e7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
Expand All @@ -44,7 +45,9 @@
import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.RssClientConfig;
Expand Down Expand Up @@ -371,6 +374,19 @@ public static RssException reportRssFetchFailedException(
rssFetchFailedException.getMessage());
RssReportShuffleFetchFailureResponse response = client.reportShuffleFetchFailure(req);
if (response.getReSubmitWholeStage()) {
TaskContext taskContext = TaskContext.get();
RssReassignServersRequest rssReassignServersRequest =
new RssReassignServersRequest(
taskContext.stageId(),
taskContext.stageAttemptNumber(),
shuffleId,
taskContext.numPartitions());
RssReassignServersResponse reassignServersResponse =
client.reassignShuffleServers(rssReassignServersRequest);
LOG.info(
"Reassign servers for stage retry due to the fetch failure, result: {}",
reassignServersResponse.isNeedReassign());

// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1
// is provided.
FetchFailedException ffe =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import scala.collection.AbstractIterator;
import scala.collection.Iterator;

import org.apache.spark.TaskContext;
import org.apache.spark.shuffle.FetchFailedException;
import org.apache.spark.shuffle.RssSparkShuffleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.client.api.ShuffleManagerClient;
import org.apache.uniffle.client.factory.ShuffleManagerClientFactory;
import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.exception.RssException;
Expand Down Expand Up @@ -120,6 +123,19 @@ private RssException generateFetchFailedIfNecessary(RssFetchFailedException e) {
e.getMessage());
RssReportShuffleFetchFailureResponse response = client.reportShuffleFetchFailure(req);
if (response.getReSubmitWholeStage()) {
TaskContext taskContext = TaskContext.get();
RssReassignServersRequest rssReassignServersRequest =
new RssReassignServersRequest(
taskContext.stageId(),
taskContext.stageAttemptNumber(),
builder.shuffleId,
taskContext.numPartitions());
RssReassignServersResponse reassignServersResponse =
client.reassignShuffleServers(rssReassignServersRequest);
LOG.info(
"Reassign servers for stage retry due to the fetch failure, result: {}",
reassignServersResponse.isNeedReassign());

// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
Expand Down

0 comments on commit c9777e7

Please sign in to comment.