From f5819360330af0b8ab678e301b4a59159d21e8fa Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 18 Jan 2025 19:55:45 +0800 Subject: [PATCH 1/6] remove zkClient in test --- .../coordinator/AbstractCoordinatorConcurrencyTest.scala | 3 --- .../kafka/coordinator/group/GroupCoordinatorTest.scala | 9 +-------- .../transaction/TransactionStateManagerTest.scala | 5 ----- 3 files changed, 1 insertion(+), 16 deletions(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 242a781c2113e..df8b6b6f6d441 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -27,7 +27,6 @@ import kafka.log.{LogManager, UnifiedLog} import kafka.server.QuotaFactory.QuotaManagers import kafka.server.{KafkaConfig, _} import kafka.utils._ -import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidationStats} @@ -50,7 +49,6 @@ abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] extend val serverProps = TestUtils.createBrokerConfig(0) val random = new Random var replicaManager: TestReplicaManager = _ - var zkClient: KafkaZkClient = _ var time: MockTime = _ var timer: MockTimer = _ var executor: ExecutorService = _ @@ -67,7 +65,6 @@ abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] extend val producePurgatory = new DelayedOperationPurgatory[DelayedProduce]("Produce", timer, 1, 1000, false, true) val watchKeys = Collections.newSetFromMap(new ConcurrentHashMap[TopicPartitionOperationKey, java.lang.Boolean]()).asScala replicaManager = TestReplicaManager(KafkaConfig.fromProps(serverProps), time, scheduler, timer, mockLogMger, mock(classOf[QuotaManagers], withSettings().stubOnly()), producePurgatory, watchKeys) - zkClient = mock(classOf[KafkaZkClient], withSettings().stubOnly()) } @AfterEach diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 025cfdac528a0..a0a683f8d3990 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -29,7 +29,6 @@ import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kafka.cluster.Partition -import kafka.zk.KafkaZkClient import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.internals.Topic @@ -79,7 +78,6 @@ class GroupCoordinatorTest { var groupCoordinator: GroupCoordinator = _ var replicaManager: ReplicaManager = _ var scheduler: KafkaScheduler = _ - var zkClient: KafkaZkClient = _ private val groupId = "groupId" private val protocolType = "consumer" @@ -111,10 +109,6 @@ class GroupCoordinatorTest { replicaManager = mock(classOf[ReplicaManager]) - zkClient = mock(classOf[KafkaZkClient]) - // make two partitions of the group topic to make sure some partitions are not owned by the coordinator - when(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).thenReturn(Some(2)) - timer = new MockTimer val config = KafkaConfig.fromProps(props) @@ -123,8 +117,7 @@ class GroupCoordinatorTest { val rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer, 1000, config.brokerId, false, true) groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, new Metrics()) - groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions), - enableMetadataExpiration = false) + groupCoordinator.startup(() => 2, enableMetadataExpiration = false) // add the partition into the owned partition list groupPartitionId = groupCoordinator.partitionFor(groupId) diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 28aef1573a938..41e6b1a954a5f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -24,7 +24,6 @@ import javax.management.ObjectName import kafka.log.UnifiedLog import kafka.server.{MetadataCache, ReplicaManager} import kafka.utils.{Pool, TestUtils} -import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME @@ -64,13 +63,9 @@ class TransactionStateManagerTest { val time = new MockTime() val scheduler = new MockScheduler(time) - val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) val metadataCache: MetadataCache = mock(classOf[MetadataCache]) - when(zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME)) - .thenReturn(Some(numPartitions)) - when(metadataCache.features()).thenReturn { new FinalizedFeatures( MetadataVersion.latestTesting(), From 8b9af5b1de06ef8ea741b5a9675297df3f53214f Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 18 Jan 2025 20:01:28 +0800 Subject: [PATCH 2/6] remove zkClient in test --- .../group/GroupCoordinatorConcurrencyTest.scala | 10 ++-------- .../TransactionCoordinatorConcurrencyTest.scala | 6 +----- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala index a0917e1ee4291..3eecdfe65e190 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala @@ -37,7 +37,6 @@ import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.purgatory.DelayedOperationPurgatory import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} -import org.mockito.Mockito.when import scala.collection._ import scala.concurrent.duration.Duration @@ -71,9 +70,6 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest override def setUp(): Unit = { super.setUp() - when(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)) - .thenReturn(Some(numPartitions)) - serverProps.setProperty(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, ConsumerMinSessionTimeout.toString) serverProps.setProperty(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, ConsumerMaxSessionTimeout.toString) serverProps.setProperty(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, GroupInitialRebalanceDelay.toString) @@ -85,8 +81,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest metrics = new Metrics groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, metrics) - groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions), - enableMetadataExpiration = false) + groupCoordinator.startup(() => numPartitions, enableMetadataExpiration = false) // Transactional appends attempt to schedule to the request handler thread using // a non request handler thread. Set this to avoid error. @@ -156,8 +151,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest groupCoordinator.shutdown() groupCoordinator = GroupCoordinator(config, replicaManager, heartbeatPurgatory, rebalancePurgatory, timer.time, new Metrics()) - groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.groupCoordinatorConfig.offsetsTopicPartitions), - enableMetadataExpiration = false) + groupCoordinator.startup(() => numPartitions, enableMetadataExpiration = false) val members = new Group(s"group", nMembersPerGroup, groupCoordinator) .members diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index 363eaa9719c00..24000894fe9bb 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -74,9 +74,6 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren override def setUp(): Unit = { super.setUp() - when(zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME)) - .thenReturn(Some(numPartitions)) - val brokerNode = new Node(0, "host", 10) val metadataCache: MetadataCache = mock(classOf[MetadataCache]) when(metadataCache.getPartitionLeaderEndpoint( @@ -98,8 +95,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren txnStateManager = new TransactionStateManager(0, scheduler, replicaManager, metadataCache, txnConfig, time, new Metrics()) - txnStateManager.startup(() => zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME).get, - enableTransactionalIdExpiration = true) + txnStateManager.startup(() => numPartitions, enableTransactionalIdExpiration = true) for (i <- 0 until numPartitions) txnStateManager.addLoadedTransactionsToCache(i, coordinatorEpoch, new Pool[String, TransactionMetadata]()) From 367c02ca1c045abbe826a0939fd031cf9472d1ba Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 19 Jan 2025 01:44:08 +0800 Subject: [PATCH 3/6] remove KafkaZkClient.scala --- .../main/scala/kafka/zk/KafkaZkClient.scala | 297 ------------------ .../kafka/zookeeper/ZooKeeperClient.scala | 267 ---------------- .../authorizer/BaseAuthorizerTest.scala | 2 - 3 files changed, 566 deletions(-) delete mode 100644 core/src/main/scala/kafka/zk/KafkaZkClient.scala delete mode 100755 core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala deleted file mode 100644 index 8b62d3c178292..0000000000000 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package kafka.zk - -import java.util.Properties -import kafka.cluster.Broker -import kafka.controller.ReplicaAssignment -import kafka.utils.Logging -import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} -import org.apache.kafka.common.{TopicPartition, Uuid} - -import scala.collection.{Map, Seq} - -/** - * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]]. - * - * Implementation note: this class includes methods for various components (Controller, Configs, Old Consumer, etc.) - * and returns instances of classes from the calling packages in some cases. This is not ideal, but it made it - * easier to migrate away from `ZkUtils` (since removed). We should revisit this. We should also consider whether a - * monolithic [[kafka.zk.ZkData]] is the way to go. - */ -class KafkaZkClient() extends AutoCloseable with Logging { - - /** - * Get entity configs for a given entity name - * @param rootEntityType entity type - * @param sanitizedEntityName entity name - * @return The successfully gathered log configs - */ - def getEntityConfigs(rootEntityType: String, sanitizedEntityName: String): Properties = { - throw new UnsupportedOperationException() - } - - /** - * Sets or creates the entity znode path with the given configs depending - * on whether it already exists or not. - * - * If enableEntityConfigControllerCheck is set, this method will ensure that a ZK controller is defined and - * that it is not modified within the duration of this call. This is done to prevent configs from being - * created or modified while the ZK to KRaft migration is taking place. - * - * The only case where enableEntityConfigControllerCheck should be false is when being called by ConfigCommand, - * i.e., "kafka-configs.sh --zookeeper". This is an old behavior we have kept around to allow users to setup - * SCRAM credentials and other configs before the cluster is started for the first time. - * - * If this is method is called concurrently, the last writer wins. In cases where we update configs and then - * partition assignment (i.e. create topic), it's possible for one thread to set this and the other to set the - * partition assignment. As such, the recommendation is to never call create topic for the same topic with different - * configs/partition assignment concurrently. - * - * @param rootEntityType entity type - * @param sanitizedEntityName entity name - * @throws KeeperException if there is an error while setting or creating the znode - */ - def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Returns all the entities for a given entityType - * @param entityType entity type - * @return List of all entity names - */ - def getAllEntitiesWithConfig(entityType: String): Seq[String] = { - throw new UnsupportedOperationException() - } - - /** - * Creates config change notification - * @param sanitizedEntityPath sanitizedEntityPath path to write - * @throws KeeperException if there is an error while setting or creating the znode - */ - def createConfigChangeNotification(sanitizedEntityPath: String): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Gets all brokers in the cluster. - * @return sequence of brokers in the cluster. - */ - def getAllBrokersInCluster: Seq[Broker] = { - throw new UnsupportedOperationException() - } - - /** - * Gets all topics in the cluster. - * @param registerWatch indicates if a watch must be registered or not - * @return sequence of topics in the cluster. - */ - def getAllTopicsInCluster(registerWatch: Boolean = false): Set[String] = { - throw new UnsupportedOperationException() - } - - /** - * Checks the topic existence - * @param topicName the name of the topic to check - * @return true if topic exists else false - */ - def topicExists(topicName: String): Boolean = { - throw new UnsupportedOperationException() - } - - /** - * Sets the topic znode with the given assignment. - * @param topic the topic whose assignment is being set. - * @param topicId unique topic ID for the topic if the version supports it - * @param assignment the partition to replica mapping to set for the given topic - * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. - * @throws KeeperException if there is an error while setting assignment - */ - def setTopicAssignment(topic: String, - topicId: Option[Uuid], - assignment: Map[TopicPartition, ReplicaAssignment], - expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Create the topic znode with the given assignment. - * @param topic the topic whose assignment is being set. - * @param topicId unique topic ID for the topic if the version supports it - * @param assignment the partition to replica mapping to set for the given topic - * @throws KeeperException if there is an error while creating assignment - */ - def createTopicAssignment(topic: String, topicId: Option[Uuid], assignment: Map[TopicPartition, Seq[Int]]): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Gets the topic IDs for the given topics. - * @param topics the topics we wish to retrieve the Topic IDs for - * @return the Topic IDs - */ - def getTopicIdsForTopics(topics: Set[String]): Map[String, Uuid] = { - throw new UnsupportedOperationException() - } - - /** - * Gets the replica assignments for the given topics. - * This function does not return information about which replicas are being added or removed from the assignment. - * @param topics the topics whose partitions we wish to get the assignments for. - * @return the replica assignment for each partition from the given topics. - */ - def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicPartition, Seq[Int]] = { - throw new UnsupportedOperationException() - } - - /** - * Gets the replica assignments for the given topics. - * @param topics the topics whose partitions we wish to get the assignments for. - * @return the full replica assignment for each partition from the given topics. - */ - def getFullReplicaAssignmentForTopics(topics: Set[String]): Map[TopicPartition, ReplicaAssignment] = { - throw new UnsupportedOperationException() - } - - /** - * Gets the partition count for a given topic - * @param topic The topic to get partition count for. - * @return optional integer that is Some if the topic exists and None otherwise. - */ - def getTopicPartitionCount(topic: String): Option[Int] = { - throw new UnsupportedOperationException() - } - - /** - * Gets all partitions in the cluster - * @return all partitions in the cluster - */ - def getAllPartitions: Set[TopicPartition] = { - throw new UnsupportedOperationException() - } - - /** - * Gets all the child nodes at a given zk node path - * @param path the path to check - * @return list of child node names - */ - def getChildren(path : String): Seq[String] = { - throw new UnsupportedOperationException() - } - - /** - * Creates the delete topic znode. - * @param topicName topic name - * @throws KeeperException if there is an error while setting or creating the znode - */ - def createDeleteTopicPath(topicName: String): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Checks if topic is marked for deletion - * @param topic the name of the topic to check - * @return true if topic is marked for deletion, else false - */ - def isTopicMarkedForDeletion(topic: String): Boolean = { - throw new UnsupportedOperationException() - } - - /** - * Sets or creates the partition reassignment znode with the given reassignment depending on whether it already - * exists or not. - * - * @param reassignment the reassignment to set on the reassignment znode - * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. - * @throws KeeperException if there is an error while setting or creating the znode - * @deprecated Use the PartitionReassignment Kafka API instead - */ - @Deprecated - def setOrCreatePartitionReassignment(reassignment: collection.Map[TopicPartition, Seq[Int]], expectedControllerEpochZkVersion: Int): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Gets the controller id. - * @return optional integer that is Some if the controller znode exists and can be parsed and None otherwise. - */ - def getControllerId: Option[Int] = { - throw new UnsupportedOperationException() - } - - /** - * Deletes the zk node recursively - * @param path path to delete - * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. - * @param recursiveDelete enable recursive delete - * @return KeeperException if there is an error while deleting the path - */ - def deletePath(path: String, expectedControllerEpochZkVersion: Int = ZkVersion.MatchAnyVersion, recursiveDelete: Boolean = true): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Creates the required zk nodes for Delegation Token storage - */ - def createDelegationTokenPaths(): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Creates Delegation Token change notification message - * @param tokenId token Id - */ - def createTokenChangeNotification(tokenId: String): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Sets or creates token info znode with the given token details depending on whether it already - * exists or not. - * - * @param token the token to set on the token znode - * @throws KeeperException if there is an error while setting or creating the znode - */ - def setOrCreateDelegationToken(token: DelegationToken): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Gets the Delegation Token Info - * @return optional TokenInfo that is Some if the token znode exists and can be parsed and None otherwise. - */ - def getDelegationTokenInfo(delegationTokenId: String): Option[TokenInformation] = { - throw new UnsupportedOperationException() - } - - /** - * Deletes the given Delegation token node - * @param delegationTokenId - * @return delete status - */ - def deleteDelegationToken(delegationTokenId: String): Boolean = { - throw new UnsupportedOperationException() - } - - /** - * Close the underlying ZooKeeperClient. - */ - def close(): Unit = { - throw new UnsupportedOperationException() - } -} diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala deleted file mode 100755 index b6593f6c0902f..0000000000000 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.zookeeper - -import kafka.utils.Logging -import org.apache.kafka.common.utils.Time - -import scala.collection.Seq - -case class ACL() {} -case class CreateMode() {} -case class OpResult() {} -object Code { - val OK: Integer = 1 - val NONODE: Integer = 1 -} -case class Code() {} -case class Stat() {} -case class KeeperException() extends RuntimeException {} - -/** - * A ZooKeeper client that encourages pipelined requests. - * - * @param connectString comma separated host:port pairs, each corresponding to a zk server - * @param sessionTimeoutMs session timeout in milliseconds - * @param connectionTimeoutMs connection timeout in milliseconds - * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking. - * @param clientConfig ZooKeeper client configuration, for TLS configs if desired - * @param name name of the client instance - */ -class ZooKeeperClient(connectString: String, - sessionTimeoutMs: Int, - connectionTimeoutMs: Int, - maxInFlightRequests: Int, - time: Time, - metricGroup: String, - metricType: String, - name: String) extends Logging { - - this.logIdent = s"[ZooKeeperClient $name] " - - /** - * Send a request and wait for its response. See handle(Seq[AsyncRequest]) for details. - * - * @param request a single request to send and wait on. - * @return an instance of the response with the specific type (e.g. CreateRequest -> CreateResponse). - */ - def handleRequest[Req <: AsyncRequest](request: Req): Req#Response = { - throw new UnsupportedOperationException() - } - - /** - * Send a pipelined sequence of requests and wait for all of their responses. - * - * The watch flag on each outgoing request will be set if we've already registered a handler for the - * path associated with the request. - * - * @param requests a sequence of requests to send and wait on. - * @return the responses for the requests. If all requests have the same type, the responses will have the respective - * response type (e.g. Seq[CreateRequest] -> Seq[CreateResponse]). Otherwise, the most specific common supertype - * will be used (e.g. Seq[AsyncRequest] -> Seq[AsyncResponse]). - */ - def handleRequests[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { - throw new UnsupportedOperationException() - } - - /** - * Register the handler to ZooKeeperClient. This is just a local operation. This does not actually register a watcher. - * - * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) - * with either a GetDataRequest or ExistsRequest. - * - * NOTE: zookeeper only allows registration to a nonexistent znode with ExistsRequest. - * - * @param zNodeChangeHandler the handler to register - */ - def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Unregister the handler from ZooKeeperClient. This is just a local operation. - * @param path the path of the handler to unregister - */ - def unregisterZNodeChangeHandler(path: String): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Register the handler to ZooKeeperClient. This is just a local operation. This does not actually register a watcher. - * - * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) with a GetChildrenRequest. - * - * @param zNodeChildChangeHandler the handler to register - */ - def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = { - throw new UnsupportedOperationException() - } - - /** - * Unregister the handler from ZooKeeperClient. This is just a local operation. - * @param path the path of the handler to unregister - */ - def unregisterZNodeChildChangeHandler(path: String): Unit = { - throw new UnsupportedOperationException() - } - - /** - * @param stateChangeHandler - */ - def registerStateChangeHandler(stateChangeHandler: StateChangeHandler): Unit = { - throw new UnsupportedOperationException() - } - - /** - * - * @param name - */ - def unregisterStateChangeHandler(name: String): Unit = { - throw new UnsupportedOperationException() - } - - def close(): Unit = { - throw new UnsupportedOperationException() - } - - def sessionId: Long = { - throw new UnsupportedOperationException() - } -} - -trait StateChangeHandler { - val name: String - def beforeInitializingSession(): Unit = {} - def afterInitializingSession(): Unit = {} - def onAuthFailure(): Unit = {} -} - -trait ZNodeChangeHandler { - val path: String - def handleCreation(): Unit = {} - def handleDeletion(): Unit = {} - def handleDataChange(): Unit = {} -} - -trait ZNodeChildChangeHandler { - val path: String - def handleChildChange(): Unit = {} -} - -// Thin wrapper for zookeeper.Op -sealed trait ZkOp { -} - -case class CreateOp(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode) extends ZkOp { -} - -case class DeleteOp(path: String, version: Int) extends ZkOp { -} - -case class SetDataOp(path: String, data: Array[Byte], version: Int) extends ZkOp { -} - -case class CheckOp(path: String, version: Int) extends ZkOp { -} - -case class ZkOpResult(zkOp: ZkOp, rawOpResult: OpResult) - -sealed trait AsyncRequest { - /** - * This type member allows us to define methods that take requests and return responses with the correct types. - * See ``ZooKeeperClient.handleRequests`` for example. - */ - type Response <: AsyncResponse - def path: String - def ctx: Option[Any] -} - -case class CreateRequest(path: String, data: Array[Byte], acl: Seq[ACL], createMode: CreateMode, - ctx: Option[Any] = None) extends AsyncRequest { - type Response = CreateResponse -} - -case class DeleteRequest(path: String, version: Int, ctx: Option[Any] = None) extends AsyncRequest { - type Response = DeleteResponse -} - -case class ExistsRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { - type Response = ExistsResponse -} - -case class GetDataRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { - type Response = GetDataResponse -} - -case class SetDataRequest(path: String, data: Array[Byte], version: Int, ctx: Option[Any] = None) extends AsyncRequest { - type Response = SetDataResponse -} - -case class GetAclRequest(path: String, ctx: Option[Any] = None) extends AsyncRequest { - type Response = GetAclResponse -} - -case class SetAclRequest(path: String, acl: Seq[ACL], version: Int, ctx: Option[Any] = None) extends AsyncRequest { - type Response = SetAclResponse -} - -case class GetChildrenRequest(path: String, registerWatch: Boolean, ctx: Option[Any] = None) extends AsyncRequest { - type Response = GetChildrenResponse -} - -case class MultiRequest(zkOps: Seq[ZkOp], ctx: Option[Any] = None) extends AsyncRequest { - type Response = MultiResponse - - override def path: String = null -} - - -sealed abstract class AsyncResponse { - def resultCode: Code - def path: String - def ctx: Option[Any] - - def metadata: ResponseMetadata - - def resultException: Option[RuntimeException] = None -} - -case class ResponseMetadata(sendTimeMs: Long, receivedTimeMs: Long) { - def responseTimeMs: Long = receivedTimeMs - sendTimeMs -} - -case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String, - metadata: ResponseMetadata) extends AsyncResponse -case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any], - metadata: ResponseMetadata) extends AsyncResponse -case class ExistsResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, - metadata: ResponseMetadata) extends AsyncResponse -case class GetDataResponse(resultCode: Code, path: String, ctx: Option[Any], data: Array[Byte], stat: Stat, - metadata: ResponseMetadata) extends AsyncResponse -case class SetDataResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, - metadata: ResponseMetadata) extends AsyncResponse -case class GetAclResponse(resultCode: Code, path: String, ctx: Option[Any], acl: Seq[ACL], stat: Stat, - metadata: ResponseMetadata) extends AsyncResponse -case class SetAclResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat, - metadata: ResponseMetadata) extends AsyncResponse -case class GetChildrenResponse(resultCode: Code, path: String, ctx: Option[Any], children: Seq[String], stat: Stat, - metadata: ResponseMetadata) extends AsyncResponse -case class MultiResponse(resultCode: Code, path: String, ctx: Option[Any], zkOpResults: Seq[ZkOpResult], - metadata: ResponseMetadata) extends AsyncResponse - -case class ZooKeeperClientException(message: String) extends RuntimeException(message) \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala index dd8337ff3c95a..c7726ff52454f 100644 --- a/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala @@ -20,7 +20,6 @@ package kafka.security.authorizer import java.net.InetAddress import java.util.UUID import kafka.server.KafkaConfig -import kafka.zookeeper.ZooKeeperClient import org.apache.kafka.common.acl.AclOperation.{ALL, READ, WRITE} import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation} @@ -50,7 +49,6 @@ trait BaseAuthorizerTest { val requestContext: RequestContext = newRequestContext(principal, InetAddress.getByName("192.168.0.1")) val superUserName = "superuser1" var config: KafkaConfig = _ - var zooKeeperClient: ZooKeeperClient = _ var resource: ResourcePattern = _ @Test From a0eda03ecd31d131affa1d2d2d125feaa030591c Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 19 Jan 2025 03:28:57 +0800 Subject: [PATCH 4/6] add ZooKeeperClientException --- .../main/scala/kafka/zookeeper/ZooKeeperClientException.scala | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala new file mode 100644 index 0000000000000..d0c48f894158d --- /dev/null +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala @@ -0,0 +1,3 @@ +package kafka.zookeeper + +case class ZooKeeperClientException(message: String) extends RuntimeException(message) \ No newline at end of file From b8b9a200c7049075ef2b6d8cedfd7f6fd9aaa4f7 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 19 Jan 2025 09:38:03 +0800 Subject: [PATCH 5/6] update the license --- .../zookeeper/ZooKeeperClientException.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala index d0c48f894158d..c9dbed8949d8d 100644 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala +++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package kafka.zookeeper case class ZooKeeperClientException(message: String) extends RuntimeException(message) \ No newline at end of file From af6a24309c6a3ee07993ab28dfd4b88a622aec46 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 19 Jan 2025 10:08:06 +0800 Subject: [PATCH 6/6] remove ZooKeeperClientException.scala --- .../zookeeper/ZooKeeperClientException.scala | 20 ------------------- 1 file changed, 20 deletions(-) delete mode 100644 core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala deleted file mode 100644 index c9dbed8949d8d..0000000000000 --- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClientException.scala +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.zookeeper - -case class ZooKeeperClientException(message: String) extends RuntimeException(message) \ No newline at end of file