Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16368: Constraints update for segment.index.bytes #18596

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class AbstractPartitionTest {
def createLogProperties(overrides: Map[String, String]): Properties = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer)
overrides.foreach { case (k, v) => logProps.put(k, v) }
logProps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ class PartitionLockTest extends Logging {
private def createLogProperties(overrides: Map[String, String]): Properties = {
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 512: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, 999: java.lang.Integer)
overrides.foreach { case (k, v) => logProps.put(k, v) }
logProps
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,7 @@ class LogCleanerTest extends Logging {
val map = new FakeOffsetMap(1000)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 120: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 120: java.lang.Integer)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
val logConfig = new LogConfig(logProps)
val log = makeLog(config = logConfig)
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ class LogLoaderTest {
Files.createFile(bogusTimeIndex2.toPath)

def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 1)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, indexIntervalBytes = 1)
val log = createLog(logDir, logConfig)

// Force the segment to access the index files because we are doing index lazy loading.
Expand Down Expand Up @@ -841,7 +841,7 @@ class LogLoaderTest {
def testReopenThenTruncate(): Unit = {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds)
// create a log
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, indexIntervalBytes = 10000)
var log = createLog(logDir, logConfig)

// add enough messages to roll over several segments then close and re-open and attempt to truncate
Expand All @@ -860,7 +860,7 @@ class LogLoaderTest {
@Test
def testOpenDeletesObsoleteFiles(): Unit = {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, retentionMs = 999)
var log = createLog(logDir, logConfig)

// append some messages to create some segments
Expand Down
14 changes: 7 additions & 7 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2364,7 +2364,7 @@ class UnifiedLogTest {
def testAsyncDelete(): Unit = {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000L)
val asyncDeleteMs = 1000
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, indexIntervalBytes = 10000,
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, indexIntervalBytes = 10000,
retentionMs = 999, fileDeleteDelayMs = asyncDeleteMs)
val log = createLog(logDir, logConfig)

Expand Down Expand Up @@ -2657,7 +2657,7 @@ class UnifiedLogTest {
@Test
def testDeleteOldSegments(): Unit = {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, retentionMs = 999)
val log = createLog(logDir, logConfig)

// append some messages to create some segments
Expand Down Expand Up @@ -2707,7 +2707,7 @@ class UnifiedLogTest {
@Test
def testLogDeletionAfterClose(): Unit = {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1024, retentionMs = 999)
val log = createLog(logDir, logConfig)

// append some messages to create some segments
Expand Down Expand Up @@ -3543,7 +3543,7 @@ class UnifiedLogTest {
def testSegmentDeletionWithHighWatermarkInitialization(): Unit = {
val logConfig = LogTestUtils.createLogConfig(
segmentBytes = 512,
segmentIndexBytes = 1000,
segmentIndexBytes = 1024,
retentionMs = 999
)
val log = createLog(logDir, logConfig)
Expand All @@ -3567,7 +3567,7 @@ class UnifiedLogTest {
def testCannotDeleteSegmentsAtOrAboveHighWatermark(): Unit = {
val logConfig = LogTestUtils.createLogConfig(
segmentBytes = 512,
segmentIndexBytes = 1000,
segmentIndexBytes = 1024,
retentionMs = 999
)
val log = createLog(logDir, logConfig)
Expand Down Expand Up @@ -3610,7 +3610,7 @@ class UnifiedLogTest {
def testCannotIncrementLogStartOffsetPastHighWatermark(): Unit = {
val logConfig = LogTestUtils.createLogConfig(
segmentBytes = 512,
segmentIndexBytes = 1000,
segmentIndexBytes = 1024,
retentionMs = 999
)
val log = createLog(logDir, logConfig)
Expand Down Expand Up @@ -4123,7 +4123,7 @@ class UnifiedLogTest {

@Test
def testActiveSegmentDeletionDueToRetentionTimeBreachWithRemoteStorage(): Unit = {
val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, segmentIndexBytes = 12,
val logConfig = LogTestUtils.createLogConfig(indexIntervalBytes = 1, segmentIndexBytes = 1024,
retentionMs = 3, localRetentionMs = 1, fileDeleteDelayMs = 0, remoteLogStorageEnable = true)
val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public Optional<String> serverConfigName(String configName) {
.define(TopicConfig.SEGMENT_MS_CONFIG, LONG, DEFAULT_SEGMENT_MS, atLeast(1), MEDIUM, TopicConfig.SEGMENT_MS_DOC)
.define(TopicConfig.SEGMENT_JITTER_MS_CONFIG, LONG, DEFAULT_SEGMENT_JITTER_MS, atLeast(0), MEDIUM,
TopicConfig.SEGMENT_JITTER_MS_DOC)
.define(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT, atLeast(4), MEDIUM,
.define(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT, atLeast(1024), MEDIUM,
TopicConfig.SEGMENT_INDEX_BYTES_DOC)
.define(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, LONG, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_DEFAULT, atLeast(1), MEDIUM,
TopicConfig.FLUSH_MESSAGES_INTERVAL_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ public void testCreateWithInitFileSizeAppendMessage() throws IOException {
File tempDir = TestUtils.tempDirectory();
Map<String, Object> configMap = new HashMap<>();
configMap.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 10);
configMap.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000);
configMap.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024);
configMap.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, 0);
LogConfig logConfig = new LogConfig(configMap);
try (LogSegment seg = LogSegment.open(tempDir, 40, logConfig, Time.SYSTEM, false,
Expand All @@ -643,7 +643,7 @@ public void testCreateWithInitFileSizeClearShutdown() throws IOException {
// Set up the log configuration
Map<String, Object> configMap = new HashMap<>();
configMap.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 10);
configMap.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1000);
configMap.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024);
configMap.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, 0);
LogConfig logConfig = new LogConfig(configMap);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.stream.IntStream;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
Expand All @@ -39,11 +40,15 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final int p0 = 0;
final int partitionCount = 1;
final int replicationFactor = 2;
final int maxBatchCountPerSegment = 1;
final int maxBatchCountPerSegment = 86;
final boolean enableRemoteLogStorage = true;
final int broker0 = 0;
final int broker1 = 1;

KeyValueSpec[] kvs = IntStream.range(0, 1000)
.mapToObj(i -> new KeyValueSpec("k" + i, "v" + i))
.toArray(KeyValueSpec[]::new);

builder
// create topicB with 1 partition and 1 RF
.createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
Expand All @@ -52,16 +57,16 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
.expectSegmentToBeOffloaded(broker1, topicB, p0, 0, new KeyValueSpec("k0", "v0"))
.expectSegmentToBeOffloaded(broker1, topicB, p0, 1, new KeyValueSpec("k1", "v1"))
.expectEarliestLocalOffsetInLogDirectory(topicB, p0, 2L)
.produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
new KeyValueSpec("k2", "v2"))
.produce(topicB, p0, kvs)
// alter dir within the replica, we only expect one replicaId
.alterLogDir(topicB, p0, Collections.singletonList(broker0).get(0))
// make sure the altered replica can still be elected as the leader
.expectLeader(topicB, p0, broker0, true)
// produce some more events and verify the earliest local offset
.expectEarliestLocalOffsetInLogDirectory(topicB, p0, 3L)
.produce(topicB, p0, new KeyValueSpec("k3", "v3"))
// consume from the beginning of the topic to read data from local and remote storage
// consume from the beginning of the topic to read data from local and remote
// storage
.expectFetchFromTieredStorage(broker0, topicB, p0, 3)
.consume(topicB, p0, 0L, 4, 3);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public static Map<String, String> createTopicConfigForRemoteStorage(boolean enab
// a "small" number of records (i.e. such that the average record size times the number of records is
// much less than the segment size), the number of records which hold in a segment is the multiple of 12
// defined below.

// TODO: WIP - Need to consider options for dealing with this segment roll approach not being valid for this PR.
topicProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(12 * maxRecordBatchPerSegment));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya let's discuss how best to handle this please

// To verify records physically absent from Kafka's storage can be consumed via the second tier storage, we
// want to delete log segments as soon as possible. When tiered storage is active, an inactive log
Expand Down
Loading