Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* used as the end of the range to indicate infinity.
*
* <p>An offset range is considered growable when the end offset could grow (or change) during
* execution time (e.g., Kafka topic partition offset, appended file, ...).
* execution time (e.g., appended file, ...).
*
* <p>The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.splittabledofn;

import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with unsplittable
* restrictions.
*
* <p>A restriction is considered unsplittable when restrictions of an element must not be processed
* simultaneously (e.g., Kafka topic partition).
*/
public class UnsplittableRestrictionTracker<RestrictionT, PositionT>
extends RestrictionTracker<RestrictionT, PositionT> implements RestrictionTracker.HasProgress {
private final RestrictionTracker<RestrictionT, PositionT> tracker;

public UnsplittableRestrictionTracker(RestrictionTracker<RestrictionT, PositionT> tracker) {
this.tracker = tracker;
}

@Override
public boolean tryClaim(PositionT position) {
return tracker.tryClaim(position);
}

@Override
public RestrictionT currentRestriction() {
return tracker.currentRestriction();
}

@Override
public @Nullable SplitResult<RestrictionT> trySplit(double fractionOfRemainder) {
return fractionOfRemainder > 0.0 && fractionOfRemainder < 1.0
? null
: tracker.trySplit(fractionOfRemainder);
}

@Override
public void checkDone() throws IllegalStateException {
tracker.checkDone();
}

@Override
public IsBounded isBounded() {
return tracker.isBounded();
}

@Override
public Progress getProgress() {
return tracker instanceof RestrictionTracker.HasProgress
? ((RestrictionTracker.HasProgress) tracker).getProgress()
: Progress.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.UnsplittableRestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -108,6 +109,14 @@
*
* <h4>Splitting</h4>
*
* <p>Consumer group members must not consume from the same {@link TopicPartition} simultaneously
* when {@code enable.auto.commit} is set. Doing so may arbitrarily overwrite a consumer group's
* committed offset for a {@link TopicPartition}. Restriction trackers for a {@link
* KafkaSourceDescriptor} are wrapped as {@link UnsplittableRestrictionTracker<OffsetRange, Long>}
* and will only return a non-null {@link org.apache.beam.sdk.transforms.splittabledofn.SplitResult}
* for a checkpoint. This ensures consistent behavior when {@code enable.auto.commit} is set and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "ensures consistent" is too strong I think? There may still be parallel scheduling on different VMs during scaling etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. The intent wasn't to comment on consistency guarantees, but the phrasing is off.

* prevents concurrent use of per-{@TopicPartition} cached {@link Consumer} resources.
*
* <p>TODO(https://github.com/apache/beam/issues/20280): Add support for initial splitting.
*
* <h4>Checkpoint and Resume Processing</h4>
Expand Down Expand Up @@ -488,20 +497,21 @@ public double getSize(

@NewTracker
@RequiresNonNull({"latestOffsetEstimatorCache"})
public OffsetRangeTracker restrictionTracker(
public UnsplittableRestrictionTracker<OffsetRange, Long> restrictionTracker(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) {
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>
latestOffsetEstimatorCache = this.latestOffsetEstimatorCache;

if (restriction.getTo() < Long.MAX_VALUE) {
return new OffsetRangeTracker(restriction);
return new UnsplittableRestrictionTracker<>(new OffsetRangeTracker(restriction));
}

// OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
// so we want to minimize the amount of connections that we start and track with Kafka. Another
// point is that it has a memoized backlog, and this should make that more reusable estimations.
return new GrowableOffsetRangeTracker(
restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor));
return new UnsplittableRestrictionTracker<>(
new GrowableOffsetRangeTracker(
restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor)));
}

@ProcessElement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,37 @@ public void testKafkaWithDelayedStopReadingFunction() {
runWithStopReadingFn(checkStopReadingFn, "delayed-stop-reading", sourceOptions.numRecords);
}

@Test
public void testKafkaWithStopReadTime() throws IOException {
writePipeline
.apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions)))
.apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
.apply(
"Write to Kafka",
writeToKafka().withTopic(options.getKafkaTopic() + "-stop-read-time"));

PipelineResult writeResult = writePipeline.run();
PipelineResult.State writeState = writeResult.waitUntilFinish();
assertNotEquals(PipelineResult.State.FAILED, writeState);

sdfReadPipeline.getOptions().as(Options.class).setStreaming(false);
PCollection<KafkaRecord<byte[], byte[]>> rows =
sdfReadPipeline.apply(
"Read from bounded Kafka",
readFromKafka()
.withTopic(options.getKafkaTopic() + "-stop-read-time")
.withStopReadTime(
org.joda.time.Instant.ofEpochMilli(
new MetricsReader(writeResult, NAMESPACE)
.getEndTimeMetric(WRITE_TIME_METRIC_NAME))));

PipelineResult readResult = sdfReadPipeline.run();
PipelineResult.State readState =
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
cancelIfTimeouted(readResult, readState);
assertNotEquals(PipelineResult.State.FAILED, readState);
}

public static final Schema KAFKA_TOPIC_SCHEMA =
Schema.builder()
.addStringField("name")
Expand Down
Loading