From 1a2719acb98e9968c8355b28295efb16557856ec Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Tue, 4 Mar 2025 16:35:14 +0000 Subject: [PATCH 1/9] Fix data race in MovingAvg, remove offset gap ratio --- build.gradle.kts | 1 + sdks/java/io/kafka/jmh/build.gradle | 31 +++++ .../sdk/io/kafka/KafkaIOUtilsBenchmark.java | 126 ++++++++++++++++++ .../beam/sdk/io/kafka/KafkaIOUtils.java | 74 +++++++++- .../sdk/io/kafka/KafkaUnboundedReader.java | 18 +-- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 92 ++++--------- settings.gradle.kts | 1 + 7 files changed, 254 insertions(+), 89 deletions(-) create mode 100644 sdks/java/io/kafka/jmh/build.gradle create mode 100644 sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java diff --git a/build.gradle.kts b/build.gradle.kts index 8dcdc14f04e7..664fb8a83d09 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -304,6 +304,7 @@ tasks.register("javaPreCommit") { dependsOn(":sdks:java:io:contextualtextio:build") dependsOn(":sdks:java:io:expansion-service:build") dependsOn(":sdks:java:io:file-based-io-tests:build") + dependsOn(":sdks:java:io:kafka:jmh:build") dependsOn(":sdks:java:io:sparkreceiver:3:build") dependsOn(":sdks:java:io:synthetic:build") dependsOn(":sdks:java:io:xml:build") diff --git a/sdks/java/io/kafka/jmh/build.gradle b/sdks/java/io/kafka/jmh/build.gradle new file mode 100644 index 000000000000..5f3d0d38f092 --- /dev/null +++ b/sdks/java/io/kafka/jmh/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } + +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.kafka.jmh', + enableJmh: true, + publish: false) + +description = "Apache Beam :: SDKs :: Java :: IO :: Kafka :: JMH" +ext.summary = "This contains JMH benchmarks for the Kafka IO connector for Beam Java" + +dependencies { + implementation project(path: ":sdks:java:io:kafka") +} diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java new file mode 100644 index 000000000000..cd216b5460f0 --- /dev/null +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import java.util.Random; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.IterationParams; +import org.openjdk.jmh.infra.ThreadParams; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(batchSize = KafkaIOUtilsBenchmark.SIZE) +@Measurement(batchSize = KafkaIOUtilsBenchmark.SIZE) +public class KafkaIOUtilsBenchmark { + static final int READERS = 2; + static final int WRITERS = 1; + static final int SIZE = 1024; + + @State(Scope.Thread) + public static class ProducerState { + private int[] values; + private int idx; + + @Setup(Level.Iteration) + public void setup(final IterationParams ip, final ThreadParams tp) { + values = + new Random(299792458 + ip.getCount() + tp.getThreadIndex()) + .ints(KafkaIOUtilsBenchmark.SIZE, 0, 100) + .toArray(); + idx = 0; + } + + @TearDown(Level.Invocation) + public void tearDown(final IterationParams ip, final ThreadParams tp) { + idx = (idx + 1) % KafkaIOUtilsBenchmark.SIZE; + } + } + + @State(Scope.Group) + public static class PlainAccumulatorState { + // As implemented before 2.64.0. + // Note that numUpdates may overflow and count back from Long.MIN_VALUE. + static class MovingAvg { + private static final int MOVING_AVG_WINDOW = 1000; + private double avg = 0; + private long numUpdates = 0; + + void update(double quantity) { + numUpdates++; + avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates); + } + + double get() { + return avg; + } + } + + private MovingAvg accumulator; + + @Setup(Level.Iteration) + public void setup(final IterationParams ip, final ThreadParams tp) { + accumulator = new MovingAvg(); + } + } + + @State(Scope.Group) + public static class AtomicAccumulatorState { + private final KafkaIOUtils.MovingAvg accumulator = new KafkaIOUtils.MovingAvg(); + } + + @Benchmark + @Group("Plain") + @GroupThreads(KafkaIOUtilsBenchmark.WRITERS) + public void plainWrite(final PlainAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.values[ps.idx]); + } + + @Benchmark + @Group("Plain") + @GroupThreads(KafkaIOUtilsBenchmark.READERS) + public double plainRead(final PlainAccumulatorState as) { + return as.accumulator.get(); + } + + @Benchmark + @Group("Atomic") + @GroupThreads(KafkaIOUtilsBenchmark.WRITERS) + public void atomicWrite(final AtomicAccumulatorState as, final ProducerState ps) { + as.accumulator.update(ps.values[ps.idx]); + } + + @Benchmark + @Group("Atomic") + @GroupThreads(KafkaIOUtilsBenchmark.READERS) + public double atomicRead(final AtomicAccumulatorState as) { + return as.accumulator.get(); + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index 95f95000a58f..13c688408510 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; @@ -129,19 +130,78 @@ static Map getOffsetConsumerConfig( return offsetConsumerConfig; } - // Maintains approximate average over last 1000 elements - static class MovingAvg { + /* + * Attempt to prevent false sharing by padding to at least 64 bytes. + * object header: 4, 8, 12 or 16 bytes + * alignment: at least 8 bytes + */ + private static class MovingAvgPadding { + byte p000, p001, p002, p003, p004, p005, p006, p007; + byte p010, p011, p012, p013, p014, p015, p016, p017; + byte p020, p021, p022, p023, p024, p025, p026, p027; + byte p030, p031, p032, p033, p034, p035, p036, p037; + byte p040, p041, p042, p043, p044, p045, p046, p047; + byte p050, p051, p052, p053, p054, p055, p056, p057; + byte p060, p061, p062, p063, p064, p065, p066, p067; + } + + // The accumulator's fields should be padded to at least 128 bytes (at least 1 or 2 + // cache lines). + private static class MovingAvgFields extends MovingAvgPadding { private static final int MOVING_AVG_WINDOW = 1000; - private double avg = 0; + + private static final AtomicLongFieldUpdater AVG = + AtomicLongFieldUpdater.newUpdater(MovingAvgFields.class, "avg"); + + private volatile long avg = 0; private long numUpdates = 0; - void update(double quantity) { - numUpdates++; - avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates); + protected double getAvg() { + return Double.longBitsToDouble(avg); + } + + protected void setAvg(final double value) { + AVG.lazySet(this, Double.doubleToRawLongBits(value)); + } + + protected long incrementAndGetNumUpdates() { + final long nextNumUpdates = Math.min(MOVING_AVG_WINDOW, numUpdates + 1); + numUpdates = nextNumUpdates; + return nextNumUpdates; + } + } + + /* + * Maintains approximate average over last 1000 elements. + * Usage is only thread-safe for a single producer and multiple consumers. + * + * Attempt to prevent false sharing by padding to 64 bytes. + * avg: 8 bytes + * numUpdates: 8 bytes + * alignment: at least 8 bytes + * + * Visibility and ordering of non-volatile loads/stores on numUpdates is guaranteed by volatile loads/stores on avg. + * Sanity of visibility is only useful when the writer thread changes since avg is the only field that can be shared between multiple concurrent threads. + */ + static class MovingAvg extends MovingAvgFields { + byte p100, p101, p102, p103, p104, p105, p106, p107; + byte p110, p111, p112, p113, p114, p115, p116, p117; + byte p120, p121, p122, p123, p124, p125, p126, p127; + byte p130, p131, p132, p133, p134, p135, p136, p137; + byte p140, p141, p142, p143, p144, p145, p146, p147; + byte p150, p151, p152, p153, p154, p155, p156, p157; + + void update(final double quantity) { + final double prevAvg = getAvg(); // volatile load (acquire) + + final long nextNumUpdates = incrementAndGetNumUpdates(); // normal load/store + final double nextAvg = prevAvg + (quantity - prevAvg) / nextNumUpdates; // normal load/store + + setAvg(nextAvg); // ordered store (release) } double get() { - return avg; + return getAvg(); // volatile load (acquire) } } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 6d5b706a987a..7d735d9a8b0a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -187,13 +187,6 @@ public boolean advance() throws IOException { continue; } - long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled. - - if (curRecord == null) { - LOG.info("{}: first record offset {}", name, offset); - offsetGap = 0; - } - // Apply user deserializers. User deserializers might throw, which will be propagated up // and 'curRecord' remains unchanged. The runner should close this reader. // TODO: write records that can't be deserialized to a "dead-letter" additional output. @@ -219,7 +212,7 @@ public boolean advance() throws IOException { int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); - pState.recordConsumed(offset, recordSize, offsetGap); + pState.recordConsumed(offset, recordSize); bytesRead.inc(recordSize); bytesReadBySplit.inc(recordSize); @@ -470,8 +463,6 @@ private static class PartitionState { private Iterator> recordIter = Collections.emptyIterator(); private KafkaIOUtils.MovingAvg avgRecordSize = new KafkaIOUtils.MovingAvg(); - private KafkaIOUtils.MovingAvg avgOffsetGap = - new KafkaIOUtils.MovingAvg(); // > 0 only when log compaction is enabled. PartitionState( TopicPartition partition, long nextOffset, TimestampPolicy timestampPolicy) { @@ -487,13 +478,12 @@ public TopicPartition topicPartition() { return topicPartition; } - // Update consumedOffset, avgRecordSize, and avgOffsetGap - void recordConsumed(long offset, int size, long offsetGap) { + // Update consumedOffset and avgRecordSize + void recordConsumed(long offset, int size) { nextOffset = offset + 1; // This is always updated from single thread. Probably not worth making atomic. avgRecordSize.update(size); - avgOffsetGap.update(offsetGap); } synchronized void setLatestOffset(long latestOffset, Instant fetchTime) { @@ -521,7 +511,7 @@ synchronized long backlogMessageCount() { if (latestOffset < 0 || nextOffset < 0) { return UnboundedReader.BACKLOG_UNKNOWN; } - double remaining = (latestOffset - nextOffset) / (1 + avgOffsetGap.get()); + double remaining = latestOffset - nextOffset; return Math.max(0, (long) Math.ceil(remaining)); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 1cf4aad34e4e..17b54e1beaf3 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -30,8 +30,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -218,7 +216,7 @@ private static final class SharedStateHolder { private static final Map> OFFSET_ESTIMATOR_CACHE = new ConcurrentHashMap<>(); - private static final Map> + private static final Map> AVG_RECORD_SIZE_CACHE = new ConcurrentHashMap<>(); } @@ -248,8 +246,7 @@ private static final class SharedStateHolder { private transient @Nullable LoadingCache offsetEstimatorCache; - private transient @Nullable LoadingCache - avgRecordSizeCache; + private transient @Nullable LoadingCache avgRecordSizeCache; private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @@ -360,24 +357,22 @@ public WatermarkEstimator newWatermarkEstimator( public double getSize( @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange) throws ExecutionException { - // If present, estimates the record size to offset gap ratio. Compacted topics may hold less - // records than the estimated offset range due to record deletion within a partition. - final LoadingCache avgRecordSizeCache = + // If present, estimates the record size. + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); - final @Nullable AverageRecordSize avgRecordSize = + final @Nullable MovingAvg avgRecordSize = avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor); // The tracker estimates the offset range by subtracting the last claimed position from the // currently observed end offset for the partition belonging to this split. - double estimatedOffsetRange = + final double estimatedOffsetRange = restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining(); - // Before processing elements, we don't have a good estimated size of records and offset gap. - // Return the estimated offset range without scaling by a size to gap ratio. - if (avgRecordSize == null) { - return estimatedOffsetRange; - } - // When processing elements, a moving average estimates the size of records and offset gap. - // Return the estimated offset range scaled by the estimated size to gap ratio. - return estimatedOffsetRange * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio(); + + // Before processing elements, we don't have a good estimated size of records. + // When processing elements, a moving average estimates the size of records. + // Return the estimated offset range scaled by the estimated size if present. + return avgRecordSize == null + ? estimatedOffsetRange + : estimatedOffsetRange * avgRecordSize.get(); } @NewTracker @@ -406,7 +401,7 @@ public ProcessContinuation processElement( WatermarkEstimator watermarkEstimator, MultiOutputReceiver receiver) throws Exception { - final LoadingCache avgRecordSizeCache = + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); final LoadingCache offsetEstimatorCache = Preconditions.checkStateNotNull(this.offsetEstimatorCache); @@ -415,7 +410,7 @@ public ProcessContinuation processElement( final Deserializer valueDeserializerInstance = Preconditions.checkStateNotNull(this.valueDeserializerInstance); final TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition(); - final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor); + // TODO: Metrics should be reported per split instead of partition, add bootstrap server hash? final Distribution rawSizes = Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString()); @@ -454,6 +449,8 @@ public ProcessContinuation processElement( final Stopwatch sw = Stopwatch.createStarted(); while (true) { + // Fetch the record size accumulator. + final MovingAvg avgRecordSize = avgRecordSizeCache.getUnchecked(kafkaSourceDescriptor); rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition()); // When there are no records available for the current TopicPartition, self-checkpoint // and move to process the next element. @@ -516,9 +513,7 @@ public ProcessContinuation processElement( int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); - avgRecordSizeCache - .getUnchecked(kafkaSourceDescriptor) - .update(recordSize, rawRecord.offset() - expectedOffset); + avgRecordSize.update(recordSize); rawSizes.update(recordSize); expectedOffset = rawRecord.offset() + 1; Instant outputTimestamp; @@ -557,7 +552,7 @@ public ProcessContinuation processElement( offsetEstimatorCache.get(kafkaSourceDescriptor).estimate())) .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) .doubleValue() - * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio())); + * avgRecordSize.get())); } } } @@ -617,7 +612,7 @@ public Coder restrictionCoder() { @Setup public void setup() throws Exception { - // Start to track record size and offset gap per bundle. + // Start to track record size. avgRecordSizeCache = SharedStateHolder.AVG_RECORD_SIZE_CACHE.computeIfAbsent( fnId, @@ -625,11 +620,11 @@ public void setup() throws Exception { return CacheBuilder.newBuilder() .maximumSize(1000L) .build( - new CacheLoader() { + new CacheLoader() { @Override - public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor) + public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception { - return new AverageRecordSize(); + return new MovingAvg(); } }); }); @@ -676,7 +671,7 @@ public KafkaLatestOffsetEstimator load( @Teardown public void teardown() throws Exception { - final LoadingCache avgRecordSizeCache = + final LoadingCache avgRecordSizeCache = Preconditions.checkStateNotNull(this.avgRecordSizeCache); final LoadingCache offsetEstimatorCache = Preconditions.checkStateNotNull(this.offsetEstimatorCache); @@ -715,45 +710,6 @@ private Map overrideBootstrapServersConfig( return config; } - // TODO: Collapse the two moving average trackers into a single accumulator using a single Guava - // AtomicDouble. Note that this requires that a single thread will call update and that while get - // may be called by multiple threads the method must only load the accumulator itself. - @ThreadSafe - private static class AverageRecordSize { - @GuardedBy("this") - private MovingAvg avgRecordSize; - - @GuardedBy("this") - private MovingAvg avgRecordGap; - - public AverageRecordSize() { - this.avgRecordSize = new MovingAvg(); - this.avgRecordGap = new MovingAvg(); - } - - public synchronized void update(int recordSize, long gap) { - avgRecordSize.update(recordSize); - avgRecordGap.update(gap); - } - - public double estimateRecordByteSizeToOffsetCountRatio() { - double avgRecordSize; - double avgRecordGap; - - synchronized (this) { - avgRecordSize = this.avgRecordSize.get(); - avgRecordGap = this.avgRecordGap.get(); - } - - // The offset increases between records in a batch fetched from a compacted topic may be - // greater than 1. Compacted topics only store records with the greatest offset per key per - // partition, the records in between are deleted and will not be observed by a consumer. - // The observed gap between offsets is used to estimate the number of records that are likely - // to be observed for the provided number of records. - return avgRecordSize / (1 + avgRecordGap); - } - } - private static Instant ensureTimestampWithinBounds(Instant timestamp) { if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) { timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; diff --git a/settings.gradle.kts b/settings.gradle.kts index 470d6c020a52..1851edaeb336 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -241,6 +241,7 @@ include(":sdks:java:io:jdbc") include(":sdks:java:io:jms") include(":sdks:java:io:json") include(":sdks:java:io:kafka") +include(":sdks:java:io:kafka:jmh") include(":sdks:java:io:kafka:upgrade") include(":sdks:java:io:kudu") include(":sdks:java:io:mongodb") From e64deb3452f819d0434b9940954250e64b081a42 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 5 Mar 2025 13:08:08 +0000 Subject: [PATCH 2/9] Suppress UUF_UNUSED_FIELD warning --- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index 13c688408510..eaa651bcfa96 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -135,6 +136,7 @@ static Map getOffsetConsumerConfig( * object header: 4, 8, 12 or 16 bytes * alignment: at least 8 bytes */ + @SuppressFBWarnings("UUF_UNUSED_FIELD") private static class MovingAvgPadding { byte p000, p001, p002, p003, p004, p005, p006, p007; byte p010, p011, p012, p013, p014, p015, p016, p017; @@ -183,6 +185,7 @@ protected long incrementAndGetNumUpdates() { * Visibility and ordering of non-volatile loads/stores on numUpdates is guaranteed by volatile loads/stores on avg. * Sanity of visibility is only useful when the writer thread changes since avg is the only field that can be shared between multiple concurrent threads. */ + @SuppressFBWarnings("UUF_UNUSED_FIELD") static class MovingAvg extends MovingAvgFields { byte p100, p101, p102, p103, p104, p105, p106, p107; byte p110, p111, p112, p113, p114, p115, p116, p117; From 4dbaa22a819810df1c4993ccdfb55dee816c0816 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 5 Mar 2025 13:23:51 +0000 Subject: [PATCH 3/9] Add package-info.java --- .../beam/sdk/io/kafka/package-info.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java new file mode 100644 index 000000000000..74e682c19117 --- /dev/null +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing from Apache Kafka. */ +package org.apache.beam.sdk.io.kafka.jmh; From f13cf6182725d559c0dcec3d98d9082e78e6b8cc Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 5 Mar 2025 13:25:48 +0000 Subject: [PATCH 4/9] Fix sloppy copy/paste --- .../main/java/org/apache/beam/sdk/io/kafka/package-info.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java index 74e682c19117..bfdefa2be4ec 100644 --- a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java @@ -16,5 +16,5 @@ * limitations under the License. */ -/** Transforms for reading and writing from Apache Kafka. */ +/** Benchmarks for KafkaIO. */ package org.apache.beam.sdk.io.kafka.jmh; From f4194cb6756f0e7820a759f09a5b5864b1b8ec5d Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 5 Mar 2025 14:02:34 +0000 Subject: [PATCH 5/9] Move JMH content to jmh package --- .../beam/sdk/io/kafka/{ => jmh}/KafkaIOUtilsBenchmark.java | 2 +- .../org/apache/beam/sdk/io/kafka/{ => jmh}/package-info.java | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/{ => jmh}/KafkaIOUtilsBenchmark.java (98%) rename sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/{ => jmh}/package-info.java (100%) diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java similarity index 98% rename from sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java rename to sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java index cd216b5460f0..55e499c77a59 100644 --- a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtilsBenchmark.java +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.io.kafka; +package org.apache.beam.sdk.io.kafka.jmh; import java.util.Random; import java.util.concurrent.TimeUnit; diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/package-info.java similarity index 100% rename from sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/package-info.java rename to sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/package-info.java From 3ffeb799e1486d24117a018b855dc434ad411a29 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 5 Mar 2025 14:24:11 +0000 Subject: [PATCH 6/9] Change MovingAvg's class and method visibility --- .../apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java | 1 + .../java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java index 55e499c77a59..26488a617762 100644 --- a/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java +++ b/sdks/java/io/kafka/jmh/src/main/java/org/apache/beam/sdk/io/kafka/jmh/KafkaIOUtilsBenchmark.java @@ -19,6 +19,7 @@ import java.util.Random; import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Group; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index eaa651bcfa96..ca280766de14 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -186,7 +186,7 @@ protected long incrementAndGetNumUpdates() { * Sanity of visibility is only useful when the writer thread changes since avg is the only field that can be shared between multiple concurrent threads. */ @SuppressFBWarnings("UUF_UNUSED_FIELD") - static class MovingAvg extends MovingAvgFields { + public static final class MovingAvg extends MovingAvgFields { byte p100, p101, p102, p103, p104, p105, p106, p107; byte p110, p111, p112, p113, p114, p115, p116, p117; byte p120, p121, p122, p123, p124, p125, p126, p127; @@ -194,7 +194,7 @@ static class MovingAvg extends MovingAvgFields { byte p140, p141, p142, p143, p144, p145, p146, p147; byte p150, p151, p152, p153, p154, p155, p156, p157; - void update(final double quantity) { + public void update(final double quantity) { final double prevAvg = getAvg(); // volatile load (acquire) final long nextNumUpdates = incrementAndGetNumUpdates(); // normal load/store @@ -203,7 +203,7 @@ void update(final double quantity) { setAvg(nextAvg); // ordered store (release) } - double get() { + public double get() { return getAvg(); // volatile load (acquire) } } From ab5b88d7804f952519171d1dcffbeaa2c5fc5ada Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Thu, 6 Mar 2025 22:32:33 +0000 Subject: [PATCH 7/9] Remove comment --- .../java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 7d735d9a8b0a..494df8e55951 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -481,8 +481,6 @@ public TopicPartition topicPartition() { // Update consumedOffset and avgRecordSize void recordConsumed(long offset, int size) { nextOffset = offset + 1; - - // This is always updated from single thread. Probably not worth making atomic. avgRecordSize.update(size); } From 275d39a25c5b3c6022fbd470637c4b2f7efdebcf Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Thu, 6 Mar 2025 22:14:50 +0000 Subject: [PATCH 8/9] Remove obsolete offset within range check --- .../sdk/io/kafka/KafkaUnboundedReader.java | 16 +------- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 37 +------------------ .../sdk/io/kafka/ReadFromKafkaDoFnTest.java | 10 ++--- 3 files changed, 7 insertions(+), 56 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 494df8e55951..02cd70b59fe2 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -151,7 +151,6 @@ public boolean start() throws IOException { @Override public boolean advance() throws IOException { /* Read first record (if any). we need to loop here because : - * - (a) some records initially need to be skipped if they are before consumedOffset * - (b) if curBatch is empty, we want to fetch next batch and then advance. * - (c) curBatch is an iterator of iterators. we interleave the records from each. * curBatch.next() might return an empty iterator. @@ -173,19 +172,6 @@ public boolean advance() throws IOException { elementsReadBySplit.inc(); ConsumerRecord rawRecord = pState.recordIter.next(); - long expected = pState.nextOffset; - long offset = rawRecord.offset(); - - if (offset < expected) { // -- (a) - // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10) - // should we check if the offset is way off from consumedOffset (say > 1M)? - LOG.warn( - "{}: ignoring already consumed offset {} for {}", - this, - offset, - pState.topicPartition); - continue; - } // Apply user deserializers. User deserializers might throw, which will be propagated up // and 'curRecord' remains unchanged. The runner should close this reader. @@ -212,7 +198,7 @@ public boolean advance() throws IOException { int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) + (rawRecord.value() == null ? 0 : rawRecord.value().length); - pState.recordConsumed(offset, recordSize); + pState.recordConsumed(rawRecord.offset(), recordSize); bytesRead.inc(recordSize); bytesReadBySplit.inc(recordSize); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 17b54e1beaf3..dcc1d24d09c6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -441,12 +441,9 @@ public ProcessContinuation processElement( try (Consumer consumer = consumerFactoryFn.apply(updatedConsumerConfig)) { ConsumerSpEL.evaluateAssign( consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition())); - long startOffset = tracker.currentRestriction().getFrom(); - long expectedOffset = startOffset; - consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset); + long expectedOffset = tracker.currentRestriction().getFrom(); + consumer.seek(kafkaSourceDescriptor.getTopicPartition(), expectedOffset); ConsumerRecords rawRecords = ConsumerRecords.empty(); - long skippedRecords = 0L; - final Stopwatch sw = Stopwatch.createStarted(); while (true) { // Fetch the record size accumulator. @@ -466,36 +463,6 @@ public ProcessContinuation processElement( return ProcessContinuation.resume(); } for (ConsumerRecord rawRecord : rawRecords) { - // If the Kafka consumer returns a record with an offset that is already processed - // the record can be safely skipped. This is needed because there is a possibility - // that the seek() above fails to move the offset to the desired position. In which - // case poll() would return records that are already cnsumed. - if (rawRecord.offset() < startOffset) { - // If the start offset is not reached even after skipping the records for 10 seconds - // then the processing is stopped with a backoff to give the Kakfa server some time - // catch up. - if (sw.elapsed().getSeconds() > 10L) { - LOG.error( - "The expected offset ({}) was not reached even after" - + " skipping consumed records for 10 seconds. The offset we could" - + " reach was {}. The processing of this bundle will be attempted" - + " at a later time.", - expectedOffset, - rawRecord.offset()); - return ProcessContinuation.resume() - .withResumeDelay(org.joda.time.Duration.standardSeconds(10L)); - } - skippedRecords++; - continue; - } - if (skippedRecords > 0L) { - LOG.warn( - "{} records were skipped due to seek returning an" - + " earlier position than requested position of {}", - skippedRecords, - expectedOffset); - skippedRecords = 0L; - } if (!tracker.tryClaim(rawRecord.offset())) { return ProcessContinuation.stop(); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index cbff0f896619..6f7a185f0ebc 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.nio.charset.StandardCharsets; @@ -523,12 +524,9 @@ public void testProcessElementWithEarlierOffset() throws Exception { new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3)); KafkaSourceDescriptor descriptor = KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null); - ProcessContinuation result = - dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver); - assertEquals(ProcessContinuation.stop(), result); - assertEquals( - createExpectedRecords(descriptor, startOffset, 3, "key", "value"), - receiver.getGoodRecords()); + assertThrows( + IllegalArgumentException.class, + () -> dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver)); } @Test From a3cbc4f317b66a217ef7ea22998d2664e5178914 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Thu, 6 Mar 2025 23:40:53 +0000 Subject: [PATCH 9/9] Ensure watermark updates when position advances --- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 72 ++++++++----------- .../sdk/io/kafka/ReadFromKafkaDoFnTest.java | 6 +- 2 files changed, 33 insertions(+), 45 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index dcc1d24d09c6..0c846e65adb9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.math.MathContext; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -55,7 +56,6 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; @@ -140,8 +140,8 @@ * {@link ReadFromKafkaDoFn} will stop reading from any removed {@link TopicPartition} automatically * by querying Kafka {@link Consumer} APIs. Please note that stopping reading may not happen as soon * as the {@link TopicPartition} is removed. For example, the removal could happen at the same time - * when {@link ReadFromKafkaDoFn} performs a {@link Consumer#poll(java.time.Duration)}. In that - * case, the {@link ReadFromKafkaDoFn} will still output the fetched records. + * when {@link ReadFromKafkaDoFn} performs a {@link Consumer#poll(Duration)}. In that case, the + * {@link ReadFromKafkaDoFn} will still output the fetched records. * *

Stop Reading from Stopped {@link TopicPartition}

* @@ -199,11 +199,11 @@ private ReadFromKafkaDoFn( this.checkStopReadingFn = transform.getCheckStopReadingFn(); this.badRecordRouter = transform.getBadRecordRouter(); this.recordTag = recordTag; - if (transform.getConsumerPollingTimeout() > 0) { - this.consumerPollingTimeout = transform.getConsumerPollingTimeout(); - } else { - this.consumerPollingTimeout = DEFAULT_KAFKA_POLL_TIMEOUT; - } + this.consumerPollingTimeout = + Duration.ofSeconds( + transform.getConsumerPollingTimeout() > 0 + ? transform.getConsumerPollingTimeout() + : DEFAULT_KAFKA_POLL_TIMEOUT); } private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class); @@ -248,7 +248,7 @@ private static final class SharedStateHolder { private transient @Nullable LoadingCache avgRecordSizeCache; private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; - @VisibleForTesting final long consumerPollingTimeout; + @VisibleForTesting final Duration consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @VisibleForTesting final Map consumerConfig; @@ -443,15 +443,17 @@ public ProcessContinuation processElement( consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition())); long expectedOffset = tracker.currentRestriction().getFrom(); consumer.seek(kafkaSourceDescriptor.getTopicPartition(), expectedOffset); - ConsumerRecords rawRecords = ConsumerRecords.empty(); while (true) { // Fetch the record size accumulator. final MovingAvg avgRecordSize = avgRecordSizeCache.getUnchecked(kafkaSourceDescriptor); - rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition()); - // When there are no records available for the current TopicPartition, self-checkpoint - // and move to process the next element. - if (rawRecords.isEmpty()) { + // Fetch the next records. + final ConsumerRecords rawRecords = + consumer.poll(this.consumerPollingTimeout); + + // No progress when the polling timeout expired. + // Self-checkpoint and move to process the next element. + if (rawRecords == ConsumerRecords.empty()) { if (!topicPartitionExists( kafkaSourceDescriptor.getTopicPartition(), consumer.partitionsFor(kafkaSourceDescriptor.getTopic()))) { @@ -462,6 +464,9 @@ public ProcessContinuation processElement( } return ProcessContinuation.resume(); } + + // Visible progress within the consumer polling timeout. + // Partially or fully claim and process records in this batch. for (ConsumerRecord rawRecord : rawRecords) { if (!tracker.tryClaim(rawRecord.offset())) { return ProcessContinuation.stop(); @@ -512,6 +517,17 @@ public ProcessContinuation processElement( } } + // Non-visible progress within the consumer polling timeout. + // Claim up to the current position. + if (expectedOffset < (expectedOffset = consumer.position(topicPartition))) { + if (!tracker.tryClaim(expectedOffset - 1)) { + return ProcessContinuation.stop(); + } + if (timestampPolicy != null) { + updateWatermarkManually(timestampPolicy, watermarkEstimator, tracker); + } + } + backlogBytes.set( (long) (BigDecimal.valueOf( @@ -531,34 +547,6 @@ private boolean topicPartitionExists( .anyMatch(partitionInfo -> partitionInfo.partition() == (topicPartition.partition())); } - // see https://github.com/apache/beam/issues/25962 - private ConsumerRecords poll( - Consumer consumer, TopicPartition topicPartition) { - final Stopwatch sw = Stopwatch.createStarted(); - long previousPosition = -1; - java.time.Duration elapsed = java.time.Duration.ZERO; - java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout); - while (true) { - final ConsumerRecords rawRecords = consumer.poll(timeout.minus(elapsed)); - if (!rawRecords.isEmpty()) { - // return as we have found some entries - return rawRecords; - } - if (previousPosition == (previousPosition = consumer.position(topicPartition))) { - // there was no progress on the offset/position, which indicates end of stream - return rawRecords; - } - elapsed = sw.elapsed(); - if (elapsed.toMillis() >= timeout.toMillis()) { - // timeout is over - LOG.warn( - "No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", - consumerPollingTimeout); - return rawRecords; - } - } - } - private TimestampPolicyContext updateWatermarkManually( TimestampPolicy timestampPolicy, WatermarkEstimator watermarkEstimator, diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 6f7a185f0ebc..d12a04972633 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -715,14 +715,14 @@ public void testUnbounded() { @Test public void testConstructorWithPollTimeout() { ReadSourceDescriptors descriptors = makeReadSourceDescriptor(consumer); - // default poll timeout = 1 scond + // default poll timeout = 2 seconds ReadFromKafkaDoFn dofnInstance = ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(2L, dofnInstance.consumerPollingTimeout); + Assert.assertEquals(Duration.ofSeconds(2L), dofnInstance.consumerPollingTimeout); // updated timeout = 5 seconds descriptors = descriptors.withConsumerPollingTimeout(5L); ReadFromKafkaDoFn dofnInstanceNew = ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(5L, dofnInstanceNew.consumerPollingTimeout); + Assert.assertEquals(Duration.ofSeconds(5L), dofnInstanceNew.consumerPollingTimeout); } private BoundednessVisitor testBoundedness(