Skip to content

Commit 32187f8

Browse files
authored
[#2426] improvement: Skip the application which is expired when flushing buffers (#2427)
### What changes were proposed in this pull request? Skip the application which is expired when flushing buffers. ### Why are the changes needed? All RPC executors will be blocked when flushBuffer and removeResources are triggered at the same time in the same app. Fix: #2426 ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UTs.
1 parent fe447f2 commit 32187f8

2 files changed

Lines changed: 50 additions & 22 deletions

File tree

server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,15 @@ public class ShuffleServerConf extends RssBaseConf {
480480
+ " The cpu usage of the shuffle server will be reduced."
481481
+ " But SKIP_LIST doesn't support the slow-start feature of MR.");
482482

483+
public static final ConfigOption<Integer> SERVER_SHUFFLE_FLUSH_TRYLOCK_TIMEOUT =
484+
ConfigOptions.key("rss.server.flush.tryLockTimeoutMs")
485+
.intType()
486+
.defaultValue(100)
487+
.withDescription(
488+
"Before the shuffle buffers of the application flush, "
489+
+ "it will try to get the lock of the application. If the time to wait for the lock"
490+
+ " is too long, the rpc threads will be blocked for a long time.");
491+
483492
public static final ConfigOption<Long> SERVER_SHUFFLE_FLUSH_THRESHOLD =
484493
ConfigOptions.key("rss.server.shuffle.flush.threshold")
485494
.longType()

server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525
import java.util.Map.Entry;
2626
import java.util.Set;
27+
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicLong;
2829
import java.util.concurrent.locks.ReentrantReadWriteLock;
2930

@@ -65,6 +66,7 @@ public class ShuffleBufferManager {
6566
private static final Logger LOG = LoggerFactory.getLogger(ShuffleBufferManager.class);
6667

6768
private final ShuffleBufferType shuffleBufferType;
69+
private final int flushTryLockTimeout;
6870
private ShuffleTaskManager shuffleTaskManager;
6971
private final ShuffleFlushManager shuffleFlushManager;
7072
private long capacity;
@@ -150,7 +152,7 @@ public ShuffleBufferManager(
150152
appBlockSizeMetricEnabled =
151153
conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED);
152154
shuffleBufferType = conf.get(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE);
153-
155+
flushTryLockTimeout = conf.get(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_TRYLOCK_TIMEOUT);
154156
ShuffleServerMetrics.addLabeledCacheGauge(
155157
BLOCK_COUNT_IN_BUFFER_POOL,
156158
() ->
@@ -597,30 +599,47 @@ private synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
597599
bufferPool.entrySet()) {
598600
String appId = appIdToBuffers.getKey();
599601
if (requiredFlush.containsKey(appId)) {
600-
for (Map.Entry<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers :
601-
appIdToBuffers.getValue().entrySet()) {
602-
int shuffleId = shuffleIdToBuffers.getKey();
603-
Set<Integer> requiredShuffleId = requiredFlush.get(appId);
604-
if (requiredShuffleId != null && requiredShuffleId.contains(shuffleId)) {
605-
for (Map.Entry<Range<Integer>, ShuffleBuffer> rangeEntry :
606-
shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
607-
Range<Integer> range = rangeEntry.getKey();
608-
ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
609-
pickedFlushSize += shuffleBuffer.getEncodedLength();
610-
flushBuffer(
611-
shuffleBuffer,
612-
appId,
613-
shuffleId,
614-
range.lowerEndpoint(),
615-
range.upperEndpoint(),
616-
HugePartitionUtils.isHugePartition(
617-
shuffleTaskManager, appId, shuffleId, range.lowerEndpoint()));
618-
if (pickedFlushSize > expectedFlushSize) {
619-
LOG.info("Already picked enough buffers to flush {} bytes", pickedFlushSize);
620-
return;
602+
if (shuffleTaskManager.isAppExpired(appId)) {
603+
continue;
604+
}
605+
ReentrantReadWriteLock.ReadLock readLock = shuffleTaskManager.getAppReadLock(appId);
606+
boolean lockAcquired = false;
607+
try {
608+
lockAcquired = readLock.tryLock(flushTryLockTimeout, TimeUnit.MILLISECONDS);
609+
if (!lockAcquired) {
610+
continue;
611+
}
612+
for (Map.Entry<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers :
613+
appIdToBuffers.getValue().entrySet()) {
614+
int shuffleId = shuffleIdToBuffers.getKey();
615+
Set<Integer> requiredShuffleId = requiredFlush.get(appId);
616+
if (requiredShuffleId != null && requiredShuffleId.contains(shuffleId)) {
617+
for (Map.Entry<Range<Integer>, ShuffleBuffer> rangeEntry :
618+
shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
619+
Range<Integer> range = rangeEntry.getKey();
620+
ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
621+
pickedFlushSize += shuffleBuffer.getEncodedLength();
622+
flushBuffer(
623+
shuffleBuffer,
624+
appId,
625+
shuffleId,
626+
range.lowerEndpoint(),
627+
range.upperEndpoint(),
628+
HugePartitionUtils.isHugePartition(
629+
shuffleTaskManager, appId, shuffleId, range.lowerEndpoint()));
630+
if (pickedFlushSize > expectedFlushSize) {
631+
LOG.info("Already picked enough buffers to flush {} bytes", pickedFlushSize);
632+
return;
633+
}
621634
}
622635
}
623636
}
637+
} catch (InterruptedException e) {
638+
LOG.warn("Ignore the InterruptedException which should be caused by internal killed");
639+
} finally {
640+
if (lockAcquired) {
641+
readLock.unlock();
642+
}
624643
}
625644
}
626645
}

0 commit comments

Comments
 (0)