From 4d972249d87a41074783f37debe27c73259e852d Mon Sep 17 00:00:00 2001 From: lianghongjia Date: Tue, 22 Apr 2025 19:54:28 +0800 Subject: [PATCH 1/3] [FLINK-37608][fs-connector] File sink supports to close buckets in parallel when snapshot --- .../functions/sink/filesystem/Buckets.java | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java index 554be18af9ac4..b04877986024d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.legacy.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; /** * The manager of the different active buckets in the {@link StreamingFileSink}. @@ -85,6 +92,8 @@ public class Buckets { private final BucketStateSerializer bucketStateSerializer; + private final ExecutorService snapshotActiveBucketsThreadPool; + /** * A constructor creating a new empty bucket manager. * @@ -121,6 +130,8 @@ public Buckets( bucketWriter.getProperties().getPendingFileRecoverableSerializer(), bucketAssigner.getSerializer()); this.maxPartCounter = 0L; + this.snapshotActiveBucketsThreadPool = + Executors.newCachedThreadPool(new ExecutorThreadFactory("snapshot-active-buckets")); } public void setBucketLifeCycleListener( @@ -267,18 +278,49 @@ private void snapshotActiveBuckets( final long checkpointId, final ListState bucketStatesContainer) throws Exception { - for (Bucket bucket : activeBuckets.values()) { - final BucketState bucketState = bucket.onReceptionOfCheckpoint(checkpointId); - + long start = System.currentTimeMillis(); + List>> futures = + activeBuckets.values().stream() + .map( + bucket -> + CompletableFuture.supplyAsync( + () -> { + try { + BucketState bucketState = + bucket.onReceptionOfCheckpoint( + checkpointId); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Subtask {} checkpointing: {}", + subtaskIndex, + bucketState); + } + return bucketState; + } catch (Exception e) { + throw new CompletionException(e); + } + }, + snapshotActiveBucketsThreadPool)) + .collect(Collectors.toList()); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + + for (CompletableFuture> future : futures) { + BucketState bucketState = future.get(); final byte[] serializedBucketState = SimpleVersionedSerialization.writeVersionAndSerialize( bucketStateSerializer, bucketState); - bucketStatesContainer.add(serializedBucketState); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState); - } + long duration = System.currentTimeMillis() - start; + if (LOG.isDebugEnabled()) { + LOG.debug( + "Subtask {} has completely snapshot the active buckets for the checkpoint with id={} , active buckets size: {}, cost: {}ms", + subtaskIndex, + checkpointId, + activeBuckets.size(), + duration); } } @@ -345,6 +387,9 @@ public void closePartFileForBucket(BucketID bucketID) throws Exception { if (bucket != null) { bucket.closePartFile(); } + if (snapshotActiveBucketsThreadPool != null) { + snapshotActiveBucketsThreadPool.shutdown(); + } } public void close() { From 046e3c6fde94a87c1549546555739554f9512386 Mon Sep 17 00:00:00 2001 From: lianghongjia Date: Wed, 23 Apr 2025 10:59:54 +0800 Subject: [PATCH 2/3] bug fix --- .../streaming/api/functions/sink/filesystem/Buckets.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java index b04877986024d..2a73baf03c4ed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java @@ -387,15 +387,15 @@ public void closePartFileForBucket(BucketID bucketID) throws Exception { if (bucket != null) { bucket.closePartFile(); } - if (snapshotActiveBucketsThreadPool != null) { - snapshotActiveBucketsThreadPool.shutdown(); - } } public void close() { if (activeBuckets != null) { activeBuckets.values().forEach(Bucket::disposePartFile); } + if (snapshotActiveBucketsThreadPool != null) { + snapshotActiveBucketsThreadPool.shutdown(); + } } private Path assembleBucketPath(BucketID bucketId) { From d9bfc0ea44d9b7c09c92f162cd0ee02f78ebe9d8 Mon Sep 17 00:00:00 2001 From: lianghongjia Date: Wed, 23 Apr 2025 11:17:19 +0800 Subject: [PATCH 3/3] catch exception clearly --- .../flink/streaming/api/functions/sink/filesystem/Buckets.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java index 2a73baf03c4ed..978e7ba9cb8c2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java @@ -296,7 +296,7 @@ private void snapshotActiveBuckets( bucketState); } return bucketState; - } catch (Exception e) { + } catch (IOException e) { throw new CompletionException(e); } },