Skip to content

Commit

Permalink
KAFKA-806: Index may not always observe log.index.interval.bytes (#18012
Browse files Browse the repository at this point in the history
)

Currently, each log.append() will add at most 1 index entry, even when the appended data is larger than log.index.interval.bytes. One potential issue is that if a follower restarts after being down for a long time, it may fetch data much bigger than log.index.interval.bytes at a time. This means that fewer index entries are created, which can increase the fetch time from the consumers.

Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
FrankYang0529 authored Jan 17, 2025
1 parent a6faec1 commit e124d39
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 177 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ private[log] class Cleaner(val id: Int,
val retained = MemoryRecords.readableRecords(outputBuffer)
// it's OK not to hold the Log's lock in this case, because this segment is only accessed by other threads
// after `Log.replaceSegments` (which acquires the lock) is called
dest.append(result.maxOffset, result.maxTimestamp, result.shallowOffsetOfMaxTimestamp(), retained)
dest.append(result.maxOffset, retained)
throttler.maybeThrottle(outputBuffer.limit())
}

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,6 @@ class UnifiedLog(@volatile var logStartOffset: Long,

validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
appendInfo.setShallowOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp)
appendInfo.setLastOffset(offset.value - 1)
appendInfo.setRecordValidationStats(validateAndOffsetAssignResult.recordValidationStats)
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
Expand Down Expand Up @@ -877,7 +876,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// will be cleaned up after the log directory is recovered. Note that the end offset of the
// ProducerStateManager will not be updated and the last stable offset will not advance
// if the append to the transaction index fails.
localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.shallowOffsetOfMaxTimestamp, validRecords)
localLog.append(appendInfo.lastOffset, validRecords)
updateHighWatermarkWithLogEndOffset()

// update the producer state
Expand Down Expand Up @@ -1158,7 +1157,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
else
OptionalInt.empty()

new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp, shallowOffsetOfMaxTimestamp,
new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp,
RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression,
validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList[RecordError], LeaderHwChange.NONE)
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4409,8 +4409,8 @@ class UnifiedLogTest {
segments.add(seg2)
assertEquals(Seq(Long.MaxValue, Long.MaxValue), log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)

seg1.append(1, 1000L, 1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord("one".getBytes)))
seg2.append(2, 2000L, 1, MemoryRecords.withRecords(2, Compression.NONE, new SimpleRecord("two".getBytes)))
seg1.append(1, MemoryRecords.withRecords(1, Compression.NONE, new SimpleRecord(1000L, "one".getBytes)))
seg2.append(2, MemoryRecords.withRecords(2, Compression.NONE, new SimpleRecord(2000L, "two".getBytes)))
assertEquals(Seq(1000L, 2000L), log.getFirstBatchTimestampForSegments(segments).asScala.toSeq)

seg1.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ class MockFetcherThread(val mockLeader: MockLeaderEndPoint,
lastOffset,
lastEpoch,
maxTimestamp,
shallowOffsetOfMaxTimestamp,
Time.SYSTEM.milliseconds(),
state.logStartOffset,
RecordValidationStats.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,6 @@ class ReplicaFetcherThreadTest {
0,
OptionalInt.empty,
RecordBatch.NO_TIMESTAMP,
-1L,
RecordBatch.NO_TIMESTAMP,
-1L,
RecordValidationStats.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.metadata.{PartitionChangeRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, ObjectSerializationCache}
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordBatch, RecordVersion, SimpleRecord}
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, Record, RecordVersion, SimpleRecord}
import org.apache.kafka.common.utils.{Exit, Utils}
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
Expand Down Expand Up @@ -402,7 +402,7 @@ class DumpLogSegmentsTest {
log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats, time.scheduler, time)
log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*), leaderEpoch = 0)
val secondSegment = log.roll()
secondSegment.append(1L, RecordBatch.NO_TIMESTAMP, 1L, MemoryRecords.withRecords(Compression.NONE, metadataRecords:_*))
secondSegment.append(1L, MemoryRecords.withRecords(Compression.NONE, metadataRecords: _*))
secondSegment.flush()
log.flush(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,8 @@ else if (segment.baseOffset() == maxOffsetMetadata.segmentBaseOffset && !maxOffs
);
}

public void append(long lastOffset, long largestTimestamp, long shallowOffsetOfMaxTimestamp, MemoryRecords records) throws IOException {
segments.activeSegment().append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records);
public void append(long lastOffset, MemoryRecords records) throws IOException {
segments.activeSegment().append(lastOffset, records);
updateLogEndOffset(lastOffset + 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@
public class LogAppendInfo {

public static final LogAppendInfo UNKNOWN_LOG_APPEND_INFO = new LogAppendInfo(-1, -1, OptionalInt.empty(),
RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, -1L,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);

private long firstOffset;
private long lastOffset;
private long maxTimestamp;
private long shallowOffsetOfMaxTimestamp;
private long logAppendTime;
private long logStartOffset;
private RecordValidationStats recordValidationStats;
Expand All @@ -52,59 +51,55 @@ public class LogAppendInfo {
/**
* Creates an instance with the given params.
*
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
* @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer)
* @param validBytes The number of valid bytes
* @param lastOffsetOfFirstBatch The last offset of the first batch
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer)
* @param validBytes The number of valid bytes
* @param lastOffsetOfFirstBatch The last offset of the first batch
*/
public LogAppendInfo(long firstOffset,
long lastOffset,
OptionalInt lastLeaderEpoch,
long maxTimestamp,
long shallowOffsetOfMaxTimestamp,
long logAppendTime,
long logStartOffset,
RecordValidationStats recordValidationStats,
CompressionType sourceCompression,
int validBytes,
long lastOffsetOfFirstBatch) {
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset,
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, logAppendTime, logStartOffset,
recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(),
LeaderHwChange.NONE);
}

/**
* Creates an instance with the given params.
*
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
* @param shallowOffsetOfMaxTimestamp The last offset of the batch with the maximum timestamp.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer)
* @param validBytes The number of valid bytes
* @param lastOffsetOfFirstBatch The last offset of the first batch
* @param recordErrors List of record errors that caused the respective batch to be dropped
* @param leaderHwChange Incremental if the high watermark needs to be increased after appending record
* Same if high watermark is not changed. None is the default value and it means append failed
* @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
* to the follower.
* @param lastOffset The last offset in the message set
* @param lastLeaderEpoch The partition leader epoch corresponding to the last offset, if available.
* @param maxTimestamp The maximum timestamp of the message set.
* @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
* @param logStartOffset The start offset of the log at the time of this append.
* @param recordValidationStats Statistics collected during record processing, `null` if `assignOffsets` is `false`
* @param sourceCompression The source codec used in the message set (send by the producer)
* @param validBytes The number of valid bytes
* @param lastOffsetOfFirstBatch The last offset of the first batch
* @param recordErrors List of record errors that caused the respective batch to be dropped
* @param leaderHwChange Incremental if the high watermark needs to be increased after appending record
* Same if high watermark is not changed. None is the default value and it means append failed
*/
public LogAppendInfo(long firstOffset,
long lastOffset,
OptionalInt lastLeaderEpoch,
long maxTimestamp,
long shallowOffsetOfMaxTimestamp,
long logAppendTime,
long logStartOffset,
RecordValidationStats recordValidationStats,
Expand All @@ -117,7 +112,6 @@ public LogAppendInfo(long firstOffset,
this.lastOffset = lastOffset;
this.lastLeaderEpoch = lastLeaderEpoch;
this.maxTimestamp = maxTimestamp;
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
this.logAppendTime = logAppendTime;
this.logStartOffset = logStartOffset;
this.recordValidationStats = recordValidationStats;
Expand Down Expand Up @@ -156,14 +150,6 @@ public void setMaxTimestamp(long maxTimestamp) {
this.maxTimestamp = maxTimestamp;
}

public long shallowOffsetOfMaxTimestamp() {
return shallowOffsetOfMaxTimestamp;
}

public void setShallowOffsetOfMaxTimestamp(long shallowOffsetOfMaxTimestamp) {
this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
}

public long logAppendTime() {
return logAppendTime;
}
Expand Down Expand Up @@ -233,12 +219,12 @@ public long numMessages() {
* @return a new instance with the given LeaderHwChange
*/
public LogAppendInfo copy(LeaderHwChange newLeaderHwChange) {
return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, shallowOffsetOfMaxTimestamp, logAppendTime, logStartOffset, recordValidationStats,
return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, logAppendTime, logStartOffset, recordValidationStats,
sourceCompression, validBytes, lastOffsetOfFirstBatch, recordErrors, newLeaderHwChange);
}

public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long logStartOffset) {
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L);
}

Expand All @@ -248,7 +234,7 @@ public static LogAppendInfo unknownLogAppendInfoWithLogStartOffset(long logStart
* in unknownLogAppendInfoWithLogStartOffset, but with additional fields recordErrors
*/
public static LogAppendInfo unknownLogAppendInfoWithAdditionalInfo(long logStartOffset, List<RecordError> recordErrors) {
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, logStartOffset,
return new LogAppendInfo(-1, -1, OptionalInt.empty(), RecordBatch.NO_TIMESTAMP, RecordBatch.NO_TIMESTAMP, logStartOffset,
RecordValidationStats.EMPTY, CompressionType.NONE, -1, -1L, recordErrors, LeaderHwChange.NONE);
}

Expand All @@ -259,7 +245,6 @@ public String toString() {
", lastOffset=" + lastOffset +
", lastLeaderEpoch=" + lastLeaderEpoch +
", maxTimestamp=" + maxTimestamp +
", shallowOffsetOfMaxTimestamp=" + shallowOffsetOfMaxTimestamp +
", logAppendTime=" + logAppendTime +
", logStartOffset=" + logStartOffset +
", recordConversionStats=" + recordValidationStats +
Expand Down
Loading

0 comments on commit e124d39

Please sign in to comment.