Skip to content

Commit 6eddaeb

Browse files
authored
KAFKA-18532: Clean Partition.scala zookeeper logic (#18594)
Reviewers: Ismael Juma <ismael@juma.me.uk>
1 parent ff3de0c commit 6eddaeb

2 files changed

Lines changed: 3 additions & 19 deletions

File tree

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import kafka.server.metadata.KRaftMetadataCache
2727
import kafka.server.share.DelayedShareFetch
2828
import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
2929
import kafka.utils._
30-
import kafka.zookeeper.ZooKeeperClientException
3130
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, TopicPartition, Uuid}
3231
import org.apache.kafka.common.errors._
3332
import org.apache.kafka.common.message.AlterPartitionRequestData.BrokerState
@@ -767,14 +766,7 @@ class Partition(val topicPartition: TopicPartition,
767766
LeaderRecoveryState.RECOVERED
768767
)
769768

770-
try {
771-
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetDirectoryId)
772-
} catch {
773-
case e: ZooKeeperClientException =>
774-
stateChangeLogger.error(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " +
775-
s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
776-
return false
777-
}
769+
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetDirectoryId)
778770

779771
val leaderLog = localLogOrException
780772

@@ -868,14 +860,7 @@ class Partition(val topicPartition: TopicPartition,
868860
LeaderRecoveryState.of(partitionState.leaderRecoveryState)
869861
)
870862

871-
try {
872-
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
873-
} catch {
874-
case e: ZooKeeperClientException =>
875-
stateChangeLogger.error(s"A ZooKeeper client exception has occurred. makeFollower will be skipping the " +
876-
s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
877-
return false
878-
}
863+
createLogInAssignedDirectoryId(partitionState, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
879864

880865
val followerLog = localLogOrException
881866
if (isNewLeaderEpoch) {

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2833,7 +2833,7 @@ class ReplicaManager(val config: KafkaConfig,
28332833
val leaderChangedPartitions = new mutable.HashSet[Partition]
28342834
val followerChangedPartitions = new mutable.HashSet[Partition]
28352835
if (!localChanges.leaders.isEmpty) {
2836-
applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala, localChanges.directoryIds.asScala)
2836+
applyLocalLeadersDelta(leaderChangedPartitions, delta, lazyOffsetCheckpoints, localChanges.leaders.asScala, localChanges.directoryIds.asScala)
28372837
}
28382838
if (!localChanges.followers.isEmpty) {
28392839
applyLocalFollowersDelta(followerChangedPartitions, newImage, delta, lazyOffsetCheckpoints, localChanges.followers.asScala, localChanges.directoryIds.asScala)
@@ -2857,7 +2857,6 @@ class ReplicaManager(val config: KafkaConfig,
28572857

28582858
private def applyLocalLeadersDelta(
28592859
changedPartitions: mutable.Set[Partition],
2860-
newImage: MetadataImage,
28612860
delta: TopicsDelta,
28622861
offsetCheckpoints: OffsetCheckpoints,
28632862
localLeaders: mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo],

0 commit comments

Comments
 (0)