From eb87001cb7c8640a8b1d939bf2f23e3f7861b624 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Tue, 15 Oct 2024 14:45:05 +0800 Subject: [PATCH] Support reg app conf to server and avoid update committedBlocksId and cachedBlocksIds bitmaps for with memory client storageType --- .../mapred/SortWriteBufferManagerTest.java | 3 +- .../mapreduce/task/reduce/FetcherTest.java | 3 +- .../manager/RssShuffleManagerBase.java | 26 +++++++++- .../spark/shuffle/RssShuffleManager.java | 5 -- .../sort/buffer/WriteBufferManagerTest.java | 3 +- .../client/api/ShuffleWriteClient.java | 51 ++++++++++++++++++- .../client/impl/ShuffleWriteClientImpl.java | 6 ++- .../reader/MockedShuffleWriteClient.java | 3 +- .../apache/uniffle/common/util/Constants.java | 2 + .../impl/grpc/ShuffleServerGrpcClient.java | 9 ++-- .../request/RssRegisterShuffleRequest.java | 23 +++++++-- proto/src/main/proto/Rss.proto | 1 + .../uniffle/server/ShuffleFlushManager.java | 9 ++-- .../uniffle/server/ShuffleServerConf.java | 5 ++ .../server/ShuffleServerGrpcService.java | 6 ++- .../uniffle/server/ShuffleTaskInfo.java | 32 ++++++++++++ .../uniffle/server/ShuffleTaskManager.java | 30 +++++++---- 17 files changed, 181 insertions(+), 36 deletions(-) diff --git a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java index 0385cb58e0..cdc8c5bea4 100644 --- a/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java +++ b/client-mr/core/src/test/java/org/apache/hadoop/mapred/SortWriteBufferManagerTest.java @@ -723,7 +723,8 @@ public void registerShuffle( ShuffleDataDistributionType distributionType, int maxConcurrencyPerPartitionToWrite, int stageAttemptNumber, - RssProtos.MergeContext mergeContext) {} + RssProtos.MergeContext mergeContext, + Map properties) {} @Override public boolean sendCommit( diff --git a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java index d2aaebe045..82a98a84ef 100644 --- a/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java +++ b/client-mr/core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/FetcherTest.java @@ -508,7 +508,8 @@ public void registerShuffle( ShuffleDataDistributionType distributionType, int maxConcurrencyPerPartitionToWrite, int stageAttemptNumber, - RssProtos.MergeContext mergeContext) {} + RssProtos.MergeContext mergeContext, + Map properties) {} @Override public boolean sendCommit( diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index 47f9e271de..11d81ed31d 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -34,6 +34,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import scala.Tuple2; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -1016,6 +1018,7 @@ protected void registerShuffleServers( } LOG.info("Start to register shuffleId {}", shuffleId); long start = System.currentTimeMillis(); + Map sparkConfMap = sparkConfToMap(getSparkConf()); serverToPartitionRanges.entrySet().stream() .forEach( entry -> { @@ -1028,7 +1031,8 @@ protected void registerShuffleServers( ShuffleDataDistributionType.NORMAL, maxConcurrencyPerPartitionToWrite, stageAttemptNumber, - null); + null, + sparkConfMap); }); LOG.info( "Finish register shuffleId {} with {} ms", shuffleId, (System.currentTimeMillis() - start)); @@ -1045,6 +1049,7 @@ protected void registerShuffleServers( } LOG.info("Start to register shuffleId[{}]", shuffleId); long start = System.currentTimeMillis(); + Map sparkConfMap = sparkConfToMap(getSparkConf()); Set>> entries = serverToPartitionRanges.entrySet(); entries.stream() @@ -1057,7 +1062,8 @@ protected void registerShuffleServers( entry.getValue(), remoteStorage, dataDistributionType, - maxConcurrencyPerPartitionToWrite); + maxConcurrencyPerPartitionToWrite, + sparkConfMap); }); LOG.info( "Finish register shuffleId[{}] with {} ms", @@ -1084,4 +1090,20 @@ public boolean isRssStageRetryForWriteFailureEnabled() { public boolean isRssStageRetryForFetchFailureEnabled() { return rssStageRetryForFetchFailureEnabled; } + + @VisibleForTesting + public SparkConf getSparkConf() { + return sparkConf; + } + + public Map sparkConfToMap(SparkConf sparkConf) { + Map map = new HashMap<>(); + + for (Tuple2 tuple : sparkConf.getAll()) { + String key = tuple._1; + map.put(key, tuple._2); + } + + return map; + } } diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 70369f503a..95c89bd29d 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -899,11 +899,6 @@ protected void registerCoordinator() { this.sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX)); } - @VisibleForTesting - public SparkConf getSparkConf() { - return sparkConf; - } - private synchronized void startHeartbeat() { shuffleWriteClient.registerApplicationInfo(id.get(), heartbeatTimeout, user); if (!heartbeatStarted) { diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java index ce0458219f..ff89f31816 100644 --- a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java +++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java @@ -720,7 +720,8 @@ public void registerShuffle( ShuffleDataDistributionType dataDistributionType, int maxConcurrencyPerPartitionToWrite, int stageAttemptNumber, - RssProtos.MergeContext mergeContext) {} + RssProtos.MergeContext mergeContext, + Map properties) {} @Override public boolean sendCommit( diff --git a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java index d21c7e67b7..caab46020f 100644 --- a/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java +++ b/client/src/main/java/org/apache/uniffle/client/api/ShuffleWriteClient.java @@ -73,7 +73,53 @@ default void registerShuffle( dataDistributionType, maxConcurrencyPerPartitionToWrite, 0, - null); + null, + Collections.emptyMap()); + } + + default void registerShuffle( + ShuffleServerInfo shuffleServerInfo, + String appId, + int shuffleId, + List partitionRanges, + RemoteStorageInfo remoteStorage, + ShuffleDataDistributionType dataDistributionType, + int maxConcurrencyPerPartitionToWrite, + Map properties) { + registerShuffle( + shuffleServerInfo, + appId, + shuffleId, + partitionRanges, + remoteStorage, + dataDistributionType, + maxConcurrencyPerPartitionToWrite, + 0, + null, + properties); + } + + default void registerShuffle( + ShuffleServerInfo shuffleServerInfo, + String appId, + int shuffleId, + List partitionRanges, + RemoteStorageInfo remoteStorage, + ShuffleDataDistributionType dataDistributionType, + int maxConcurrencyPerPartitionToWrite, + int stageAttemptNumber, + MergeContext mergeContext) { + registerShuffle( + shuffleServerInfo, + appId, + shuffleId, + partitionRanges, + remoteStorage, + dataDistributionType, + maxConcurrencyPerPartitionToWrite, + stageAttemptNumber, + mergeContext, + Collections.emptyMap()); } void registerShuffle( @@ -85,7 +131,8 @@ void registerShuffle( ShuffleDataDistributionType dataDistributionType, int maxConcurrencyPerPartitionToWrite, int stageAttemptNumber, - MergeContext mergeContext); + MergeContext mergeContext, + Map properties); boolean sendCommit( Set shuffleServerInfoSet, String appId, int shuffleId, int numMaps); diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index c81d3c7255..ac93d57b1e 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -565,7 +565,8 @@ public void registerShuffle( ShuffleDataDistributionType dataDistributionType, int maxConcurrencyPerPartitionToWrite, int stageAttemptNumber, - MergeContext mergeContext) { + MergeContext mergeContext, + Map properties) { String user = null; try { user = UserGroupInformation.getCurrentUser().getShortUserName(); @@ -583,7 +584,8 @@ public void registerShuffle( dataDistributionType, maxConcurrencyPerPartitionToWrite, stageAttemptNumber, - mergeContext); + mergeContext, + properties); RssRegisterShuffleResponse response = getShuffleServerClient(shuffleServerInfo).registerShuffle(request); diff --git a/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleWriteClient.java b/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleWriteClient.java index 6798a792cb..d856292f20 100644 --- a/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleWriteClient.java +++ b/client/src/test/java/org/apache/uniffle/client/record/reader/MockedShuffleWriteClient.java @@ -64,7 +64,8 @@ public void registerShuffle( ShuffleDataDistributionType dataDistributionType, int maxConcurrencyPerPartitionToWrite, int stageAttemptNumber, - RssProtos.MergeContext mergeContext) {} + RssProtos.MergeContext mergeContext, + Map properties) {} @Override public boolean sendCommit( diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java index d63c2e46e8..79ceb2f10f 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java +++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java @@ -91,4 +91,6 @@ private Constants() {} public static final String DRIVER_HOST = "driver.host"; public static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss"; + + public static final String SPARK_RSS_CONFIG_PREFIX = "spark."; } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 20b6bf98b1..6a95b3122e 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -198,7 +198,8 @@ private ShuffleRegisterResponse doRegisterShuffle( ShuffleDataDistributionType dataDistributionType, int maxConcurrencyPerPartitionToWrite, int stageAttemptNumber, - MergeContext mergeContext) { + MergeContext mergeContext, + Map properties) { ShuffleRegisterRequest.Builder reqBuilder = ShuffleRegisterRequest.newBuilder(); reqBuilder .setAppId(appId) @@ -207,7 +208,8 @@ private ShuffleRegisterResponse doRegisterShuffle( .setShuffleDataDistribution(RssProtos.DataDistribution.valueOf(dataDistributionType.name())) .setMaxConcurrencyPerPartitionToWrite(maxConcurrencyPerPartitionToWrite) .addAllPartitionRanges(toShufflePartitionRanges(partitionRanges)) - .setStageAttemptNumber(stageAttemptNumber); + .setStageAttemptNumber(stageAttemptNumber) + .putAllProperties(properties); if (mergeContext != null) { reqBuilder.setMergeContext(mergeContext); } @@ -484,7 +486,8 @@ public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest requ request.getDataDistributionType(), request.getMaxConcurrencyPerPartitionToWrite(), request.getStageAttemptNumber(), - request.getMergeContext()); + request.getMergeContext(), + request.getProperties()); RssRegisterShuffleResponse response; RssProtos.StatusCode statusCode = rpcResponse.getStatus(); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java index 92ed1e15e9..a2cac5367f 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssRegisterShuffleRequest.java @@ -17,8 +17,11 @@ package org.apache.uniffle.client.request; +import java.util.Collections; import java.util.List; +import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.apache.uniffle.common.PartitionRange; @@ -39,7 +42,9 @@ public class RssRegisterShuffleRequest { private int stageAttemptNumber; private final MergeContext mergeContext; + private Map properties; + @VisibleForTesting public RssRegisterShuffleRequest( String appId, int shuffleId, @@ -57,7 +62,8 @@ public RssRegisterShuffleRequest( dataDistributionType, maxConcurrencyPerPartitionToWrite, 0, - null); + null, + Collections.emptyMap()); } public RssRegisterShuffleRequest( @@ -69,7 +75,8 @@ public RssRegisterShuffleRequest( ShuffleDataDistributionType dataDistributionType, int maxConcurrencyPerPartitionToWrite, int stageAttemptNumber, - MergeContext mergeContext) { + MergeContext mergeContext, + Map properties) { this.appId = appId; this.shuffleId = shuffleId; this.partitionRanges = partitionRanges; @@ -79,8 +86,10 @@ public RssRegisterShuffleRequest( this.maxConcurrencyPerPartitionToWrite = maxConcurrencyPerPartitionToWrite; this.stageAttemptNumber = stageAttemptNumber; this.mergeContext = mergeContext; + this.properties = properties; } + @VisibleForTesting public RssRegisterShuffleRequest( String appId, int shuffleId, @@ -97,7 +106,8 @@ public RssRegisterShuffleRequest( dataDistributionType, RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE.defaultValue(), 0, - null); + null, + Collections.emptyMap()); } public RssRegisterShuffleRequest( @@ -111,7 +121,8 @@ public RssRegisterShuffleRequest( ShuffleDataDistributionType.NORMAL, RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE.defaultValue(), 0, - null); + null, + Collections.emptyMap()); } public String getAppId() { @@ -149,4 +160,8 @@ public int getStageAttemptNumber() { public MergeContext getMergeContext() { return mergeContext; } + + public Map getProperties() { + return properties; + } } diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto index d92ec40c7a..5e8cc632d5 100644 --- a/proto/src/main/proto/Rss.proto +++ b/proto/src/main/proto/Rss.proto @@ -197,6 +197,7 @@ message ShuffleRegisterRequest { int32 maxConcurrencyPerPartitionToWrite = 7; int32 stageAttemptNumber = 8; MergeContext mergeContext = 9; + map properties = 10; } enum DataDistribution { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 574b9ef0af..a73327a461 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -184,11 +184,14 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception { throw new EventRetryException(); } long endTime = System.currentTimeMillis(); - - // update some metrics for shuffle task - updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks()); ShuffleTaskInfo shuffleTaskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId()); + if (shuffleTaskInfo == null || !shuffleTaskInfo.isClientStorageTypeWithMemory()) { + // With memory storage type should never need cachedBlockIds, + // since client do not need call finish shuffle rpc + // update some metrics for shuffle task + updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks()); + } if (isStorageAuditLogEnabled) { AUDIT_LOGGER.info( String.format( diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index 310afcb231..6e6543abff 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -719,6 +719,11 @@ public class ShuffleServerConf extends RssBaseConf { .booleanType() .defaultValue(false) .withDescription("Whether to enable app detail log"); + public static final ConfigOption SERVER_WITH_MEMORY_STORAGE_TYPE_OPTIMIZE_ENABLED = + ConfigOptions.key("rss.server.with.memory.storage.type.opt.enabled") + .booleanType() + .defaultValue(true) + .withDescription("Whether to enable with memory storage type optimize"); public ShuffleServerConf() {} diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 994a25c890..212b447fed 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -322,7 +322,8 @@ public void registerShuffle( new RemoteStorageInfo(remoteStoragePath, remoteStorageConf), user, shuffleDataDistributionType, - maxConcurrencyPerPartitionToWrite); + maxConcurrencyPerPartitionToWrite, + req.getPropertiesMap()); if (StatusCode.SUCCESS == result && shuffleServer.isRemoteMergeEnable() && req.hasMergeContext()) { @@ -338,7 +339,8 @@ public void registerShuffle( new RemoteStorageInfo(remoteStoragePath, remoteStorageConf), user, shuffleDataDistributionType, - maxConcurrencyPerPartitionToWrite); + maxConcurrencyPerPartitionToWrite, + req.getPropertiesMap()); if (result == StatusCode.SUCCESS) { result = shuffleServer diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java index 94987d6614..31f0d9ee38 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java @@ -23,16 +23,21 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.uniffle.common.PartitionInfo; import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.config.RssClientConf; +import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.JavaUtils; import org.apache.uniffle.common.util.UnitConverter; +import org.apache.uniffle.storage.util.StorageType; /** * ShuffleTaskInfo contains the information of submitting the shuffle, the information of the cache @@ -74,6 +79,8 @@ public class ShuffleTaskInfo { private final Map shuffleDetailInfos; private final Map latestStageAttemptNumbers; + private Map properties; + private boolean clientStorageTypeWithMemory = false; public ShuffleTaskInfo(String appId) { this.appId = appId; @@ -287,6 +294,10 @@ public long getPartitionNum() { return partitionDataSizes.values().stream().mapToLong(Map::size).sum(); } + public boolean isClientStorageTypeWithMemory() { + return clientStorageTypeWithMemory; + } + @Override public String toString() { return "ShuffleTaskInfo{" @@ -307,4 +318,25 @@ public String toString() { + shuffleDetailInfos + '}'; } + + public void setProperties(Map properties, ShuffleServerConf serverConf) { + Map filteredProperties = + properties.entrySet().stream() + .filter(entry -> entry.getKey().contains(".rss.")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + this.properties = filteredProperties; + LOGGER.info("{} set properties to {}", appId, properties); + if (serverConf.getBoolean(ShuffleServerConf.SERVER_WITH_MEMORY_STORAGE_TYPE_OPTIMIZE_ENABLED)) { + String storageType = properties.get(RssClientConf.RSS_STORAGE_TYPE.key()); + if (StringUtils.isEmpty(storageType)) { + storageType = + properties.get( + Constants.SPARK_RSS_CONFIG_PREFIX + RssClientConf.RSS_STORAGE_TYPE.key()); + } + if (StringUtils.isNotEmpty(storageType)) { + clientStorageTypeWithMemory = StorageType.withMemory(StorageType.valueOf(storageType)); + LOGGER.info("{} set clientStorageTypeWithMemory to {}", appId, clientStorageTypeWithMemory); + } + } + } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index fa531ba024..be7ae1b883 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -279,7 +280,8 @@ public StatusCode registerShuffle( remoteStorageInfo, user, ShuffleDataDistributionType.NORMAL, - -1); + -1, + Collections.emptyMap()); } public StatusCode registerShuffle( @@ -289,13 +291,15 @@ public StatusCode registerShuffle( RemoteStorageInfo remoteStorageInfo, String user, ShuffleDataDistributionType dataDistType, - int maxConcurrencyPerPartitionToWrite) { + int maxConcurrencyPerPartitionToWrite, + Map properties) { ReentrantReadWriteLock.WriteLock lock = getAppWriteLock(appId); lock.lock(); try { refreshAppId(appId); ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId); + taskInfo.setProperties(properties, conf); taskInfo.setUser(user); taskInfo.setSpecification( ShuffleSpecification.builder() @@ -498,15 +502,23 @@ public void updateCachedBlockIds( } ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo(appId)); - Roaring64NavigableMap bitmap = - shuffleTaskInfo - .getCachedBlockIds() - .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf()); - long size = 0L; - synchronized (bitmap) { + // With memory storage type should never need cachedBlockIds, + // since client do not need call finish shuffle rpc + if (!shuffleTaskInfo.isClientStorageTypeWithMemory()) { + Roaring64NavigableMap bitmap = + shuffleTaskInfo + .getCachedBlockIds() + .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf()); + + synchronized (bitmap) { + for (ShufflePartitionedBlock spb : spbs) { + bitmap.addLong(spb.getBlockId()); + size += spb.getEncodedLength(); + } + } + } else { for (ShufflePartitionedBlock spb : spbs) { - bitmap.addLong(spb.getBlockId()); size += spb.getEncodedLength(); } }