Skip to content

Commit

Permalink
add support of catch the failure result fetch servers
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Jun 25, 2024
1 parent afea817 commit 0eb3f8b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ public boolean reassignOnStageResubmit(
Object shuffleLock = rssStageResubmitManager.getOrCreateShuffleLock(shuffleId);
synchronized (shuffleLock) {
if (!rssStageResubmitManager.isStageAttemptRetried(shuffleId, stageId, stageAttemptNumber)) {
long start = System.currentTimeMillis();
int requiredShuffleServerNumber =
RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
Expand Down Expand Up @@ -698,9 +699,10 @@ public boolean reassignOnStageResubmit(
(StageAttemptShuffleHandleInfo) shuffleHandleInfoManager.get(shuffleId);
stageAttemptShuffleHandleInfo.replaceCurrentShuffleHandleInfo(shuffleHandleInfo);
LOG.info(
"The stage retry has been triggered successfully for the stageId: {}, attemptNumber: {}",
"The stage retry has been triggered successfully for the stageId: {}, attemptNumber: {}. It costs {}(ms)",
stageId,
stageAttemptNumber);
stageAttemptNumber,
System.currentTimeMillis() - start);
return true;
} else {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ public Roaring64NavigableMap getShuffleResult(
boolean isSuccessful = false;
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
int successCnt = 0;
Set<ShuffleServerInfo> failureServers = new HashSet<>();
for (ShuffleServerInfo ssi : shuffleServerInfoSet) {
try {
RssGetShuffleResultResponse response =
Expand All @@ -812,6 +813,7 @@ public Roaring64NavigableMap getShuffleResult(
}
}
} catch (Exception e) {
failureServers.add(ssi);
LOG.warn(
"Get shuffle result is failed from "
+ ssi
Expand All @@ -824,7 +826,8 @@ public Roaring64NavigableMap getShuffleResult(
}
if (!isSuccessful) {
throw new RssFetchFailedException(
"Get shuffle result is failed for appId[" + appId + "], shuffleId[" + shuffleId + "]");
"Get shuffle result is failed for appId[" + appId + "], shuffleId[" + shuffleId + "]",
failureServers.toArray(new ShuffleServerInfo[0]));
}
return blockIdBitmap;
}
Expand All @@ -839,6 +842,7 @@ public Roaring64NavigableMap getShuffleResultForMultiPart(
PartitionDataReplicaRequirementTracking replicaRequirementTracking) {
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Set<Integer> allRequestedPartitionIds = new HashSet<>();
Set<ShuffleServerInfo> failureServers = new HashSet<>();
for (Map.Entry<ShuffleServerInfo, Set<Integer>> entry : serverToPartitions.entrySet()) {
ShuffleServerInfo shuffleServerInfo = entry.getKey();
Set<Integer> requestPartitions = Sets.newHashSet();
Expand All @@ -864,6 +868,7 @@ public Roaring64NavigableMap getShuffleResultForMultiPart(
}
}
} catch (Exception e) {
failureServers.add(shuffleServerInfo);
failedPartitions.addAll(requestPartitions);
LOG.warn(
"Get shuffle result is failed from "
Expand All @@ -883,7 +888,8 @@ public Roaring64NavigableMap getShuffleResultForMultiPart(
if (!isSuccessful) {
LOG.error("Failed to meet replica requirement: {}", replicaRequirementTracking);
throw new RssFetchFailedException(
"Get shuffle result is failed for appId[" + appId + "], shuffleId[" + shuffleId + "]");
"Get shuffle result is failed for appId[" + appId + "], shuffleId[" + shuffleId + "]",
failureServers.toArray(new ShuffleServerInfo[0]));
}
return blockIdBitmap;
}
Expand Down

0 comments on commit 0eb3f8b

Please sign in to comment.