Skip to content

Commit

Permalink
Remove obsolete offset within range check
Browse files Browse the repository at this point in the history
  • Loading branch information
sjvanrossum committed Mar 6, 2025
1 parent ab5b88d commit 275d39a
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
/* Read first record (if any). we need to loop here because :
* - (a) some records initially need to be skipped if they are before consumedOffset
* - (b) if curBatch is empty, we want to fetch next batch and then advance.
* - (c) curBatch is an iterator of iterators. we interleave the records from each.
* curBatch.next() might return an empty iterator.
Expand All @@ -173,19 +172,6 @@ public boolean advance() throws IOException {
elementsReadBySplit.inc();

ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
long expected = pState.nextOffset;
long offset = rawRecord.offset();

if (offset < expected) { // -- (a)
// this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
// should we check if the offset is way off from consumedOffset (say > 1M)?
LOG.warn(
"{}: ignoring already consumed offset {} for {}",
this,
offset,
pState.topicPartition);
continue;
}

// Apply user deserializers. User deserializers might throw, which will be propagated up
// and 'curRecord' remains unchanged. The runner should close this reader.
Expand All @@ -212,7 +198,7 @@ public boolean advance() throws IOException {
int recordSize =
(rawRecord.key() == null ? 0 : rawRecord.key().length)
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
pState.recordConsumed(offset, recordSize);
pState.recordConsumed(rawRecord.offset(), recordSize);
bytesRead.inc(recordSize);
bytesReadBySplit.inc(recordSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,9 @@ public ProcessContinuation processElement(
try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(
consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
long startOffset = tracker.currentRestriction().getFrom();
long expectedOffset = startOffset;
consumer.seek(kafkaSourceDescriptor.getTopicPartition(), startOffset);
long expectedOffset = tracker.currentRestriction().getFrom();
consumer.seek(kafkaSourceDescriptor.getTopicPartition(), expectedOffset);
ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
long skippedRecords = 0L;
final Stopwatch sw = Stopwatch.createStarted();

while (true) {
// Fetch the record size accumulator.
Expand All @@ -466,36 +463,6 @@ public ProcessContinuation processElement(
return ProcessContinuation.resume();
}
for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
// If the Kafka consumer returns a record with an offset that is already processed
// the record can be safely skipped. This is needed because there is a possibility
// that the seek() above fails to move the offset to the desired position. In which
// case poll() would return records that are already cnsumed.
if (rawRecord.offset() < startOffset) {
// If the start offset is not reached even after skipping the records for 10 seconds
// then the processing is stopped with a backoff to give the Kakfa server some time
// catch up.
if (sw.elapsed().getSeconds() > 10L) {
LOG.error(
"The expected offset ({}) was not reached even after"
+ " skipping consumed records for 10 seconds. The offset we could"
+ " reach was {}. The processing of this bundle will be attempted"
+ " at a later time.",
expectedOffset,
rawRecord.offset());
return ProcessContinuation.resume()
.withResumeDelay(org.joda.time.Duration.standardSeconds(10L));
}
skippedRecords++;
continue;
}
if (skippedRecords > 0L) {
LOG.warn(
"{} records were skipped due to seek returning an"
+ " earlier position than requested position of {}",
skippedRecords,
expectedOffset);
skippedRecords = 0L;
}
if (!tracker.tryClaim(rawRecord.offset())) {
return ProcessContinuation.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -523,12 +524,9 @@ public void testProcessElementWithEarlierOffset() throws Exception {
new OffsetRangeTracker(new OffsetRange(startOffset, startOffset + 3));
KafkaSourceDescriptor descriptor =
KafkaSourceDescriptor.of(topicPartition, null, null, null, null, null);
ProcessContinuation result =
dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver);
assertEquals(ProcessContinuation.stop(), result);
assertEquals(
createExpectedRecords(descriptor, startOffset, 3, "key", "value"),
receiver.getGoodRecords());
assertThrows(
IllegalArgumentException.class,
() -> dofnInstanceWithBrokenSeek.processElement(descriptor, tracker, null, receiver));
}

@Test
Expand Down

0 comments on commit 275d39a

Please sign in to comment.