Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2197] Support reg app conf to server and avoid update committed/cached blockIds bitmap #2196

Merged
merged 6 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,8 @@ public void registerShuffle(
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
RssProtos.MergeContext mergeContext) {}
RssProtos.MergeContext mergeContext,
Map<String, String> properties) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,8 @@ public void registerShuffle(
ShuffleDataDistributionType distributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
RssProtos.MergeContext mergeContext) {}
RssProtos.MergeContext mergeContext,
Map<String, String> properties) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import scala.Tuple2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -1016,6 +1018,7 @@ protected void registerShuffleServers(
}
LOG.info("Start to register shuffleId {}", shuffleId);
long start = System.currentTimeMillis();
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
serverToPartitionRanges.entrySet().stream()
.forEach(
entry -> {
Expand All @@ -1028,7 +1031,8 @@ protected void registerShuffleServers(
ShuffleDataDistributionType.NORMAL,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber,
null);
null,
sparkConfMap);
});
LOG.info(
"Finish register shuffleId {} with {} ms", shuffleId, (System.currentTimeMillis() - start));
Expand All @@ -1045,6 +1049,7 @@ protected void registerShuffleServers(
}
LOG.info("Start to register shuffleId[{}]", shuffleId);
long start = System.currentTimeMillis();
Map<String, String> sparkConfMap = sparkConfToMap(getSparkConf());
Set<Map.Entry<ShuffleServerInfo, List<PartitionRange>>> entries =
serverToPartitionRanges.entrySet();
entries.stream()
Expand All @@ -1057,7 +1062,8 @@ protected void registerShuffleServers(
entry.getValue(),
remoteStorage,
dataDistributionType,
maxConcurrencyPerPartitionToWrite);
maxConcurrencyPerPartitionToWrite,
sparkConfMap);
});
LOG.info(
"Finish register shuffleId[{}] with {} ms",
Expand All @@ -1084,4 +1090,20 @@ public boolean isRssStageRetryForWriteFailureEnabled() {
public boolean isRssStageRetryForFetchFailureEnabled() {
return rssStageRetryForFetchFailureEnabled;
}

@VisibleForTesting
public SparkConf getSparkConf() {
return sparkConf;
}

public Map<String, String> sparkConfToMap(SparkConf sparkConf) {
Map<String, String> map = new HashMap<>();

for (Tuple2<String, String> tuple : sparkConf.getAll()) {
String key = tuple._1;
map.put(key, tuple._2);
}

return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -899,11 +899,6 @@ protected void registerCoordinator() {
this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX));
}

@VisibleForTesting
public SparkConf getSparkConf() {
return sparkConf;
}

private synchronized void startHeartbeat() {
shuffleWriteClient.registerApplicationInfo(id.get(), heartbeatTimeout, user);
if (!heartbeatStarted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,8 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
RssProtos.MergeContext mergeContext) {}
RssProtos.MergeContext mergeContext,
Map<String, String> properties) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,53 @@ default void registerShuffle(
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
0,
null);
null,
Collections.emptyMap());
}

default void registerShuffle(
ShuffleServerInfo shuffleServerInfo,
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
Map<String, String> properties) {
registerShuffle(
shuffleServerInfo,
appId,
shuffleId,
partitionRanges,
remoteStorage,
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
0,
null,
properties);
}

default void registerShuffle(
ShuffleServerInfo shuffleServerInfo,
String appId,
int shuffleId,
List<PartitionRange> partitionRanges,
RemoteStorageInfo remoteStorage,
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
MergeContext mergeContext) {
registerShuffle(
shuffleServerInfo,
appId,
shuffleId,
partitionRanges,
remoteStorage,
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber,
mergeContext,
Collections.emptyMap());
}

void registerShuffle(
Expand All @@ -85,7 +131,8 @@ void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
MergeContext mergeContext);
MergeContext mergeContext,
Map<String, String> properties);

boolean sendCommit(
Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,8 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
MergeContext mergeContext) {
MergeContext mergeContext,
Map<String, String> properties) {
String user = null;
try {
user = UserGroupInformation.getCurrentUser().getShortUserName();
Expand All @@ -583,7 +584,8 @@ public void registerShuffle(
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
stageAttemptNumber,
mergeContext);
mergeContext,
properties);
RssRegisterShuffleResponse response =
getShuffleServerClient(shuffleServerInfo).registerShuffle(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public void registerShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
RssProtos.MergeContext mergeContext) {}
RssProtos.MergeContext mergeContext,
Map<String, String> properties) {}

@Override
public boolean sendCommit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,6 @@ private Constants() {}
public static final String DRIVER_HOST = "driver.host";

public static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss";

public static final String SPARK_RSS_CONFIG_PREFIX = "spark.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ private ShuffleRegisterResponse doRegisterShuffle(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
MergeContext mergeContext) {
MergeContext mergeContext,
Map<String, String> properties) {
ShuffleRegisterRequest.Builder reqBuilder = ShuffleRegisterRequest.newBuilder();
reqBuilder
.setAppId(appId)
Expand All @@ -207,7 +208,8 @@ private ShuffleRegisterResponse doRegisterShuffle(
.setShuffleDataDistribution(RssProtos.DataDistribution.valueOf(dataDistributionType.name()))
.setMaxConcurrencyPerPartitionToWrite(maxConcurrencyPerPartitionToWrite)
.addAllPartitionRanges(toShufflePartitionRanges(partitionRanges))
.setStageAttemptNumber(stageAttemptNumber);
.setStageAttemptNumber(stageAttemptNumber)
.putAllProperties(properties);
if (mergeContext != null) {
reqBuilder.setMergeContext(mergeContext);
}
Expand Down Expand Up @@ -484,7 +486,8 @@ public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest requ
request.getDataDistributionType(),
request.getMaxConcurrencyPerPartitionToWrite(),
request.getStageAttemptNumber(),
request.getMergeContext());
request.getMergeContext(),
request.getProperties());

RssRegisterShuffleResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.uniffle.client.request;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;

import org.apache.uniffle.common.PartitionRange;
Expand All @@ -39,7 +42,9 @@ public class RssRegisterShuffleRequest {
private int stageAttemptNumber;

private final MergeContext mergeContext;
private Map<String, String> properties;

@VisibleForTesting
public RssRegisterShuffleRequest(
String appId,
int shuffleId,
Expand All @@ -57,7 +62,8 @@ public RssRegisterShuffleRequest(
dataDistributionType,
maxConcurrencyPerPartitionToWrite,
0,
null);
null,
Collections.emptyMap());
}

public RssRegisterShuffleRequest(
Expand All @@ -69,7 +75,8 @@ public RssRegisterShuffleRequest(
ShuffleDataDistributionType dataDistributionType,
int maxConcurrencyPerPartitionToWrite,
int stageAttemptNumber,
MergeContext mergeContext) {
MergeContext mergeContext,
Map<String, String> properties) {
this.appId = appId;
this.shuffleId = shuffleId;
this.partitionRanges = partitionRanges;
Expand All @@ -79,8 +86,10 @@ public RssRegisterShuffleRequest(
this.maxConcurrencyPerPartitionToWrite = maxConcurrencyPerPartitionToWrite;
this.stageAttemptNumber = stageAttemptNumber;
this.mergeContext = mergeContext;
this.properties = properties;
}

@VisibleForTesting
public RssRegisterShuffleRequest(
String appId,
int shuffleId,
Expand All @@ -97,7 +106,8 @@ public RssRegisterShuffleRequest(
dataDistributionType,
RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE.defaultValue(),
0,
null);
null,
Collections.emptyMap());
}

public RssRegisterShuffleRequest(
Expand All @@ -111,7 +121,8 @@ public RssRegisterShuffleRequest(
ShuffleDataDistributionType.NORMAL,
RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE.defaultValue(),
0,
null);
null,
Collections.emptyMap());
}

public String getAppId() {
Expand Down Expand Up @@ -149,4 +160,8 @@ public int getStageAttemptNumber() {
public MergeContext getMergeContext() {
return mergeContext;
}

public Map<String, String> getProperties() {
return properties;
}
}
1 change: 1 addition & 0 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ message ShuffleRegisterRequest {
int32 maxConcurrencyPerPartitionToWrite = 7;
int32 stageAttemptNumber = 8;
MergeContext mergeContext = 9;
map<string, string> properties = 10;
}

enum DataDistribution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandlerWrapper;
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
import static org.apache.uniffle.server.ShuffleServerMetrics.COMMITTED_BLOCK_COUNT;
Expand All @@ -62,6 +63,7 @@ public class ShuffleFlushManager {
private final String storageType;
private final int storageDataReplica;
private final ShuffleServerConf shuffleServerConf;
private final boolean storageTypeWithMemory;
private Configuration hadoopConf;
// appId -> shuffleId -> committed shuffle blockIds
private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds =
Expand Down Expand Up @@ -101,6 +103,7 @@ public ShuffleFlushManager(
.mapToLong(bitmap -> bitmap.getLongCardinality())
.sum(),
2 * 60 * 1000L /* 2 minutes */);
this.storageTypeWithMemory = StorageType.withMemory(StorageType.valueOf(storageType));
}

public void addToFlushQueue(ShuffleDataFlushEvent event) {
Expand Down Expand Up @@ -194,11 +197,14 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception {
throw new EventRetryException();
}
long endTime = System.currentTimeMillis();

// update some metrics for shuffle task
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks());
ShuffleTaskInfo shuffleTaskInfo =
shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId());
if (shuffleTaskInfo == null || !storageTypeWithMemory) {
// With memory storage type should never need cachedBlockIds,
// since client do not need call finish shuffle rpc
// update some metrics for shuffle task
updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks());
}
if (isStorageAuditLogEnabled) {
AUDIT_LOGGER.info(
String.format(
Expand Down
Loading
Loading