From 3456cf1fd8b286f5faf6415d7eece665679dff30 Mon Sep 17 00:00:00 2001 From: Jason Taylor Date: Fri, 17 Jan 2025 13:00:55 +0000 Subject: [PATCH] KAFKA-16368: Constraints update for segment.index.bytes --- .../unit/kafka/cluster/AbstractPartitionTest.scala | 2 +- .../unit/kafka/cluster/PartitionLockTest.scala | 2 +- .../test/scala/unit/kafka/log/LogCleanerTest.scala | 2 +- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 6 +++--- .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 14 +++++++------- .../kafka/storage/internals/log/LogConfig.java | 2 +- .../storage/internals/log/LogSegmentTest.java | 4 ++-- .../storage/integration/AlterLogDirTest.java | 13 +++++++++---- .../storage/utils/TieredStorageTestUtils.java | 2 ++ 9 files changed, 27 insertions(+), 20 deletions(-) diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala index 3c8e20701b0a6..da03a01e00e3c 100644 --- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala @@ -102,7 +102,7 @@ class AbstractPartitionTest { def createLogProperties(overrides: Map[String, String]): Properties = { val logProps = new Properties() logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer) - logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer) + logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer) overrides.foreach { case (k, v) => logProps.put(k, v) } logProps diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 6c444f0e5602e..3e6fdac1b69a9 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -365,7 +365,7 @@ class PartitionLockTest extends Logging { private def createLogProperties(overrides: Map[String, String]): Properties = { val logProps = new Properties() logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer) - logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer) + logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer) overrides.foreach { case (k, v) => logProps.put(k, v) } logProps diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index f7746c7a69b94..d6008107aba43 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1808,7 +1808,7 @@ class LogCleanerTest extends Logging { val map = new FakeOffsetMap(1000) val logProps = new Properties() logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 120: java.lang.Integer) - logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 120: java.lang.Integer) + logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer) logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) val logConfig = new LogConfig(logProps) val log = makeLog(config = logConfig) diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index a2b49685b4357..ea2d03484778d 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -811,7 +811,7 @@ class LogLoaderTest { Files.createFile(bogusTimeIndex2.toPath) def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 1) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, indexIntervalBytes = 1) val log = createLog(logDir, logConfig) // Force the segment to access the index files because we are doing index lazy loading. @@ -841,7 +841,7 @@ class LogLoaderTest { def testReopenThenTruncate(): Unit = { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) // create a log - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, indexIntervalBytes = 10000) var log = createLog(logDir, logConfig) // add enough messages to roll over several segments then close and re-open and attempt to truncate @@ -860,7 +860,7 @@ class LogLoaderTest { @Test def testOpenDeletesObsoleteFiles(): Unit = { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, retentionMs = 999) var log = createLog(logDir, logConfig) // append some messages to create some segments diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index e66418d35cf7d..d91125354aa68 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -2364,7 +2364,7 @@ class UnifiedLogTest { def testAsyncDelete(): Unit = { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000L) val asyncDeleteMs = 1000 - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000, + val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, indexIntervalBytes = 10000, retentionMs = 999, fileDeleteDelayMs = asyncDeleteMs) val log = createLog(logDir, logConfig) @@ -2657,7 +2657,7 @@ class UnifiedLogTest { @Test def testDeleteOldSegments(): Unit = { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, retentionMs = 999) val log = createLog(logDir, logConfig) // append some messages to create some segments @@ -2707,7 +2707,7 @@ class UnifiedLogTest { @Test def testLogDeletionAfterClose(): Unit = { def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) + val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, retentionMs = 999) val log = createLog(logDir, logConfig) // append some messages to create some segments @@ -3543,7 +3543,7 @@ class UnifiedLogTest { def testSegmentDeletionWithHighWatermarkInitialization(): Unit = { val logConfig = LogTestUtils.createLogConfig( segmentBytes = 512, - segmentIndexBytes = 1000, + segmentIndexBytes = 1024, retentionMs = 999 ) val log = createLog(logDir, logConfig) @@ -3567,7 +3567,7 @@ class UnifiedLogTest { def testCannotDeleteSegmentsAtOrAboveHighWatermark(): Unit = { val logConfig = LogTestUtils.createLogConfig( segmentBytes = 512, - segmentIndexBytes = 1000, + segmentIndexBytes = 1024, retentionMs = 999 ) val log = createLog(logDir, logConfig) @@ -3610,7 +3610,7 @@ class UnifiedLogTest { def testCannotIncrementLogStartOffsetPastHighWatermark(): Unit = { val logConfig = LogTestUtils.createLogConfig( segmentBytes = 512, - segmentIndexBytes = 1000, + segmentIndexBytes = 1024, retentionMs = 999 ) val log = createLog(logDir, logConfig) @@ -4123,7 +4123,7 @@ class UnifiedLogTest { @Test def testActiveSegmentDeletionDueToRetentionTimeBreachWithRemoteStorage(): Unit = { - val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, segmentIndexBytes = 12, + val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, segmentIndexBytes = 1024, retentionMs = 3, localRetentionMs = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true) val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index f4294329f2505..c5714519945d6 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -202,7 +202,7 @@ public Optional serverConfigName(String configName) { .define(TopicConfig.SEGMENT_MS_CONFIG, LONG, DEFAULT_SEGMENT_MS, atLeast(1), MEDIUM, TopicConfig.SEGMENT_MS_DOC) .define(TopicConfig.SEGMENT_JITTER_MS_CONFIG, LONG, DEFAULT_SEGMENT_JITTER_MS, atLeast(0), MEDIUM, TopicConfig.SEGMENT_JITTER_MS_DOC) - .define(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT, atLeast(4), MEDIUM, + .define(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT, atLeast(1024), MEDIUM, TopicConfig.SEGMENT_INDEX_BYTES_DOC) .define(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, LONG, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_DEFAULT, atLeast(1), MEDIUM, TopicConfig.FLUSH_MESSAGES_INTERVAL_DOC) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java index dc6a0cfb3aa78..df7af5009fab7 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java @@ -619,7 +619,7 @@ public void testCreateWithInitFileSizeAppendMessage() throws IOException { File tempDir = TestUtils.tempDirectory(); Map configMap = new HashMap<>(); configMap.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 10); - configMap.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000); + configMap.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024); configMap.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, 0); LogConfig logConfig = new LogConfig(configMap); try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig, Time.SYSTEM, false, @@ -643,7 +643,7 @@ public void testCreateWithInitFileSizeClearShutdown() throws IOException { // Set up the log configuration Map configMap = new HashMap<>(); configMap.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 10); - configMap.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000); + configMap.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024); configMap.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, 0); LogConfig logConfig = new LogConfig(configMap); diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java index d73ba53b677be..2733bf084bf8f 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.stream.IntStream; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -39,11 +40,15 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { final int p0 = 0; final int partitionCount = 1; final int replicationFactor = 2; - final int maxBatchCountPerSegment = 1; + final int maxBatchCountPerSegment = 86; final boolean enableRemoteLogStorage = true; final int broker0 = 0; final int broker1 = 1; + KeyValueSpec[] kvs = IntStream.range(0, 1000) + .mapToObj(i -> new KeyValueSpec("k" + i, "v" + i)) + .toArray(KeyValueSpec[]::new); + builder // create topicB with 1 partition and 1 RF .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment, @@ -52,8 +57,7 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { .expectSegmentToBeOffloaded(broker1, topicB, p0, 0, new KeyValueSpec("k0", "v0")) .expectSegmentToBeOffloaded(broker1, topicB, p0, 1, new KeyValueSpec("k1", "v1")) .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L) - .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"), - new KeyValueSpec("k2", "v2")) + .produce(topicB, p0, kvs) // alter dir within the replica, we only expect one replicaId .alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0)) // make sure the altered replica can still be elected as the leader @@ -61,7 +65,8 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { // produce some more events and verify the earliest local offset .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L) .produce(topicB, p0, new KeyValueSpec("k3", "v3")) - // consume from the beginning of the topic to read data from local and remote storage + // consume from the beginning of the topic to read data from local and remote + // storage .expectFetchFromTieredStorage(broker0, topicB, p0, 3) .consume(topicB, p0, 0L, 4, 3); } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java index ef7b839d01980..c7402259edd7e 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java @@ -178,6 +178,8 @@ public static Map createTopicConfigForRemoteStorage(boolean enab // a "small" number of records (i.e. such that the average record size times the number of records is // much less than the segment size), the number of records which hold in a segment is the multiple of 12 // defined below. + + // TODO: WIP - Need to consider options for dealing with this segment roll approach not being valid for this PR. topicProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(12 * maxRecordBatchPerSegment)); // To verify records physically absent from Kafka's storage can be consumed via the second tier storage, we // want to delete log segments as soon as possible. When tiered storage is active, an inactive log