Skip to content

Commit

Permalink
Support reg app conf to server and avoid update committedBlocksId and…
Browse files Browse the repository at this point in the history
… cachedBlocksIds bitmaps for with memory client storageType
  • Loading branch information
maobaolong committed Oct 22, 2024
1 parent 87f9b6f commit eb87001
Show file tree
Hide file tree
Showing 17 changed files with 181 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,8 @@ public void registerShuffle(
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
RssProtos.MergeContext mergeContext) {}
RssProtos.MergeContext mergeContext,
Map<String, String> properties) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ public void registerShuffle(
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
RssProtos.MergeContext mergeContext) {}
RssProtos.MergeContext mergeContext,
Map<String, String> properties) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import scala.Tuple2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -1016,6 +1018,7 @@ protected void registerShuffleServers(
}
LOG.info("Start to register shuffleId {}", shuffleId);
long start = System.currentTimeMillis();
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
serverToPartitionRanges.entrySet().stream()
.forEach(
entry -> {
Expand All @@ -1028,7 +1031,8 @@ protected void registerShuffleServers(
ShuffleDataDistributionType.NORMAL,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber,
null);
null,
sparkConfMap);
});
LOG.info(
"Finish register shuffleId {} with {} ms", shuffleId, (System.currentTimeMillis() - start));
Expand All @@ -1045,6 +1049,7 @@ protected void registerShuffleServers(
}
LOG.info("Start to register shuffleId[{}]", shuffleId);
long start = System.currentTimeMillis();
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries =
serverToPartitionRanges.entrySet();
entries.stream()
Expand All @@ -1057,7 +1062,8 @@ protected void registerShuffleServers(
entry.getValue(),
remoteStorage,
dataDistributionType,
maxConcurrencyPerPartitionToWrite);
maxConcurrencyPerPartitionToWrite,
sparkConfMap);
});
LOG.info(
"Finish register shuffleId[{}] with {} ms",
Expand All @@ -1084,4 +1090,20 @@ public boolean isRssStageRetryForWriteFailureEnabled() {
public boolean isRssStageRetryForFetchFailureEnabled() {
return rssStageRetryForFetchFailureEnabled;
}

@VisibleForTesting
public SparkConf getSparkConf() {
return sparkConf;
}

public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
Map<String, String> map = new HashMap<>();

for (Tuple2<String, String> tuple : sparkConf.getAll()) {
String key = tuple._1;
map.put(key, tuple._2);
}

return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -899,11 +899,6 @@ protected void registerCoordinator() {
this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX));
}

@VisibleForTesting
public SparkConf getSparkConf() {
return sparkConf;
}

private synchronized void startHeartbeat() {
shuffleWriteClient.registerApplicationInfo(id.get(), heartbeatTimeout, user);
if (!heartbeatStarted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,8 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
RssProtos.MergeContext mergeContext) {}
RssProtos.MergeContext mergeContext,
Map<String, String> properties) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,53 @@ default void registerShuffle(
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
0,
null);
null,
Collections.emptyMap());
}

default void registerShuffle(
ShuffleServerInfo shuffleServerInfo,
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
Map<String, String> properties) {
registerShuffle(
shuffleServerInfo,
appId,
shuffleId,
partitionRanges,
remoteStorage,
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
0,
null,
properties);
}

default void registerShuffle(
ShuffleServerInfo shuffleServerInfo,
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
MergeContext mergeContext) {
registerShuffle(
shuffleServerInfo,
appId,
shuffleId,
partitionRanges,
remoteStorage,
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber,
mergeContext,
Collections.emptyMap());
}

void registerShuffle(
Expand All @@ -85,7 +131,8 @@ void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
MergeContext mergeContext);
MergeContext mergeContext,
Map<String, String> properties);

boolean sendCommit(
Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,8 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
MergeContext mergeContext) {
MergeContext mergeContext,
Map<String, String> properties) {
String user = null;
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
Expand All @@ -583,7 +584,8 @@ public void registerShuffle(
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber,
mergeContext);
mergeContext,
properties);
RssRegisterShuffleResponse response =
getShuffleServerClient(shuffleServerInfo).registerShuffle(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
RssProtos.MergeContext mergeContext) {}
RssProtos.MergeContext mergeContext,
Map<String, String> properties) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,6 @@ private Constants() {}
public static final String DRIVER_HOST = "driver.host";

public static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss";

public static final String SPARK_RSS_CONFIG_PREFIX = "spark.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ private ShuffleRegisterResponse doRegisterShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
MergeContext mergeContext) {
MergeContext mergeContext,
Map<String, String> properties) {
ShuffleRegisterRequest.Builder reqBuilder = ShuffleRegisterRequest.newBuilder();
reqBuilder
.setAppId(appId)
Expand All @@ -207,7 +208,8 @@ private ShuffleRegisterResponse doRegisterShuffle(
.setShuffleDataDistribution(RssProtos.DataDistribution.valueOf(dataDistributionType.name()))
.setMaxConcurrencyPerPartitionToWrite(maxConcurrencyPerPartitionToWrite)
.addAllPartitionRanges(toShufflePartitionRanges(partitionRanges))
.setStageAttemptNumber(stageAttemptNumber);
.setStageAttemptNumber(stageAttemptNumber)
.putAllProperties(properties);
if (mergeContext != null) {
reqBuilder.setMergeContext(mergeContext);
}
Expand Down Expand Up @@ -484,7 +486,8 @@ public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest requ
request.getDataDistributionType(),
request.getMaxConcurrencyPerPartitionToWrite(),
request.getStageAttemptNumber(),
request.getMergeContext());
request.getMergeContext(),
request.getProperties());

RssRegisterShuffleResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.uniffle.client.request;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;

import org.apache.uniffle.common.PartitionRange;
Expand All @@ -39,7 +42,9 @@ public class RssRegisterShuffleRequest {
private int stageAttemptNumber;

private final MergeContext mergeContext;
private Map<String, String> properties;

@VisibleForTesting
public RssRegisterShuffleRequest(
String appId,
int shuffleId,
Expand All @@ -57,7 +62,8 @@ public RssRegisterShuffleRequest(
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
0,
null);
null,
Collections.emptyMap());
}

public RssRegisterShuffleRequest(
Expand All @@ -69,7 +75,8 @@ public RssRegisterShuffleRequest(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
MergeContext mergeContext) {
MergeContext mergeContext,
Map<String, String> properties) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionRanges = partitionRanges;
Expand All @@ -79,8 +86,10 @@ public RssRegisterShuffleRequest(
this.maxConcurrencyPerPartitionToWrite = maxConcurrencyPerPartitionToWrite;
this.stageAttemptNumber = stageAttemptNumber;
this.mergeContext = mergeContext;
this.properties = properties;
}

@VisibleForTesting
public RssRegisterShuffleRequest(
String appId,
int shuffleId,
Expand All @@ -97,7 +106,8 @@ public RssRegisterShuffleRequest(
dataDistributionType,
RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE.defaultValue(),
0,
null);
null,
Collections.emptyMap());
}

public RssRegisterShuffleRequest(
Expand All @@ -111,7 +121,8 @@ public RssRegisterShuffleRequest(
ShuffleDataDistributionType.NORMAL,
RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE.defaultValue(),
0,
null);
null,
Collections.emptyMap());
}

public String getAppId() {
Expand Down Expand Up @@ -149,4 +160,8 @@ public int getStageAttemptNumber() {
public MergeContext getMergeContext() {
return mergeContext;
}

public Map<String, String> getProperties() {
return properties;
}
}
1 change: 1 addition & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ message ShuffleRegisterRequest {
int32 maxConcurrencyPerPartitionToWrite = 7;
int32 stageAttemptNumber = 8;
MergeContext mergeContext = 9;
map<string, string> properties = 10;
}

enum DataDistribution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,14 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception {
throw new EventRetryException();
}
long endTime = System.currentTimeMillis();

// update some metrics for shuffle task
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks());
ShuffleTaskInfo shuffleTaskInfo =
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
if (shuffleTaskInfo == null || !shuffleTaskInfo.isClientStorageTypeWithMemory()) {
// With memory storage type should never need cachedBlockIds,
// since client do not need call finish shuffle rpc
// update some metrics for shuffle task
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks());
}
if (isStorageAuditLogEnabled) {
AUDIT_LOGGER.info(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,11 @@ public class ShuffleServerConf extends RssBaseConf {
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable app detail log");
public static final ConfigOption<Boolean> SERVER_WITH_MEMORY_STORAGE_TYPE_OPTIMIZE_ENABLED =
ConfigOptions.key("rss.server.with.memory.storage.type.opt.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether to enable with memory storage type optimize");

public ShuffleServerConf() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ public void registerShuffle(
new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
user,
shuffleDataDistributionType,
maxConcurrencyPerPartitionToWrite);
maxConcurrencyPerPartitionToWrite,
req.getPropertiesMap());
if (StatusCode.SUCCESS == result
&& shuffleServer.isRemoteMergeEnable()
&& req.hasMergeContext()) {
Expand All @@ -338,7 +339,8 @@ public void registerShuffle(
new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
user,
shuffleDataDistributionType,
maxConcurrencyPerPartitionToWrite);
maxConcurrencyPerPartitionToWrite,
req.getPropertiesMap());
if (result == StatusCode.SUCCESS) {
result =
shuffleServer
Expand Down
Loading

0 comments on commit eb87001

Please sign in to comment.