Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18545 Remove Zookeeper logic from LogManager #18592

Merged
merged 10 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ class LogManager(logDirs: Seq[File],
addStrayLog(topicPartition, log)
warn(s"Loaded stray log: $logDir")
} else if (isStray(log)) {
// Unlike Zookeeper mode, which tracks pending topic deletions under a ZNode, KRaft is unable to prevent a topic from being recreated before every replica has been deleted.
// A KRaft broker with an offline directory may be unable to detect it still holds a to-be-deleted replica,
// We are unable to prevent a topic from being recreated before every replica has been deleted.
// Broker with an offline directory may be unable to detect it still holds a to-be-deleted replica,
// and can create a conflicting topic partition for a new incarnation of the topic in one of the remaining online directories.
// So upon a restart in which the offline directory is back online we need to clean up the old replica directory.
log.renameDir(UnifiedLog.logStrayDirName(log.topicPartition), shouldReinitialize = false)
Expand Down Expand Up @@ -950,7 +950,6 @@ class LogManager(logDirs: Seq[File],
wasRemoteLogEnabled: Boolean): Unit = {
topicConfigUpdated(topic)
val logs = logsByTopic(topic)
// Combine the default properties with the overrides in zk to create the new LogConfig
val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig)
val isRemoteLogStorageEnabled = newLogConfig.remoteStorageEnable()
// We would like to validate the configuration no matter whether the logs have materialised on disk or not.
Expand Down Expand Up @@ -1079,11 +1078,7 @@ class LogManager(logDirs: Seq[File],

log
}
// When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
if (log.topicId.isEmpty) {
topicId.foreach(log.assignTopicId)
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the topid id so it's not optional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this method doesn't utilize Optional's functionality internally, the calling method Partition#createLogIfNotExists uses Optional features, and Optional is still required when passing to the UnifiedLog constructor later. Therefore, I don't see a compelling reason to avoid using Optional at this point.

def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid],

private def initializeTopicId(): Unit = {

Copy link
Member

@ijuma ijuma Jan 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Unrelated to this PR, but a good clean-up would be to extract the topic id creation so it's always set by the time the UnifiedLog constructor is called (the way it works now unnecessarily exposes an optional topic id even though it is never really optional with kraft).

Copy link
Collaborator Author

@m1a2st m1a2st Jan 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It a good idea, I will file a Jira to trace this issue.

Update: https://issues.apache.org/jira/browse/KAFKA-18586

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should do this cleanup after all zk-related paths are removed from ReplicaManager

// Ensure topic IDs are consistent
topicId.foreach { topicId =>
log.topicId.foreach { logTopicId =>
Expand Down
47 changes: 0 additions & 47 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4188,53 +4188,6 @@ class ReplicaManagerTest {
}
}

@Test
def testPartitionMetadataFileCreatedWithExistingLog(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
try {
val brokerList = Seq[Integer](0, 1).asJava
val topicPartition = new TopicPartition(topic, 0)

replicaManager.logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None)

assertTrue(replicaManager.getLog(topicPartition).isDefined)
var log = replicaManager.getLog(topicPartition).get
assertEquals(None, log.topicId)
assertFalse(log.partitionMetadataFile.get.exists())

val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
val topicNames = topicIds.asScala.map(_.swap).asJava

def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(epoch)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()

val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition))
assertFalse(replicaManager.localLog(topicPartition).isEmpty)
val id = topicIds.get(topicPartition.topic())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this again, does this test actually make sense to keep? It's supposed to test the metadata file get created but we've now removed these assertions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I also wasn't sure if removing the assertions make sense here. I was planning to take a closer look. If the assertions are not useful, then we should check if the test is useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this again, does this test actually make sense to keep? It's supposed to test the metadata file get created but we've now removed these assertions.

If we're removing the created file assertion, we should also remove this test. This make sense to me.

log = replicaManager.localLog(topicPartition).get
assertTrue(log.partitionMetadataFile.get.exists())
val partitionMetadata = log.partitionMetadataFile.get.read()

// Current version of PartitionMetadataFile is 0.
assertEquals(0, partitionMetadata.version)
assertEquals(id, partitionMetadata.topicId)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}

@Test
def testInconsistentIdReturnsError(): Unit = {
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
Expand Down