From 66de08b525902f3007b7e2a5b03246be33617c01 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Mon, 18 Aug 2025 17:31:15 +0800 Subject: [PATCH 1/3] MINOR: Move RaftManager interface to raft module --- ...ftManager.scala => KafkaRaftManager.scala} | 34 +++---------------- .../scala/kafka/server/ControllerApis.scala | 2 +- .../NodeToControllerChannelManager.scala | 6 ++-- .../kafka/tools/TestRaftRequestHandler.scala | 2 +- .../scala/kafka/tools/TestRaftServer.scala | 4 +-- .../kafka/server/ControllerApisTest.scala | 3 +- .../org/apache/kafka/raft/RaftManager.java | 33 ++++++++++++++++++ 7 files changed, 45 insertions(+), 39 deletions(-) rename core/src/main/scala/kafka/raft/{RaftManager.scala => KafkaRaftManager.scala} (93%) create mode 100644 raft/src/main/java/org/apache/kafka/raft/RaftManager.java diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala similarity index 93% rename from core/src/main/scala/kafka/raft/RaftManager.scala rename to core/src/main/scala/kafka/raft/KafkaRaftManager.scala index 9e8ea38f8fd9a..794ac5b3c28af 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala @@ -20,10 +20,8 @@ import java.io.File import java.net.InetSocketAddress import java.nio.file.Files import java.nio.file.Paths -import java.util.OptionalInt +import java.util.{Optional, OptionalInt, Collection => JCollection, Map => JMap} import java.util.concurrent.CompletableFuture -import java.util.{Map => JMap} -import java.util.{Collection => JCollection} import kafka.server.KafkaConfig import kafka.utils.CoreUtils import kafka.utils.Logging @@ -40,7 +38,7 @@ import org.apache.kafka.common.requests.RequestHeader import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time, Utils} -import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, MetadataLogConfig, QuorumConfig, RaftClient, ReplicatedLog, TimingWheelExpirationService} +import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, MetadataLogConfig, QuorumConfig, RaftClient, RaftManager, ReplicatedLog, TimingWheelExpirationService} import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.common.Feature import org.apache.kafka.server.common.serialization.RecordSerde @@ -50,7 +48,6 @@ import org.apache.kafka.server.util.timer.SystemTimer import org.apache.kafka.storage.internals.log.{LogManager, UnifiedLog} import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters._ object KafkaRaftManager { private def createLogDirectory(logDir: File, logDirName: String): File = { @@ -85,29 +82,6 @@ object KafkaRaftManager { } } -trait RaftManager[T] { - def handleRequest( - context: RequestContext, - header: RequestHeader, - request: ApiMessage, - createdTimeMs: Long - ): CompletableFuture[ApiMessage] - - def register( - listener: RaftClient.Listener[T] - ): Unit - - def leaderAndEpoch: LeaderAndEpoch - - def client: RaftClient[T] - - def replicatedLog: ReplicatedLog - - def voterNode(id: Int, listener: ListenerName): Option[Node] - - def recordSerde: RecordSerde[T] -} - class KafkaRaftManager[T]( clusterId: String, config: KafkaConfig, @@ -296,8 +270,8 @@ class KafkaRaftManager[T]( client.leaderAndEpoch } - override def voterNode(id: Int, listener: ListenerName): Option[Node] = { - client.voterNode(id, listener).toScala + override def voterNode(id: Int, listener: ListenerName): Optional[Node] = { + client.voterNode(id, listener) } override def recordSerde: RecordSerde[T] = serde diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index cc9a12c9aa303..57336d52eebea 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -24,7 +24,6 @@ import java.util.Map.Entry import java.util.concurrent.CompletableFuture import java.util.function.Consumer import kafka.network.RequestChannel -import kafka.raft.RaftManager import kafka.server.QuotaFactory.QuotaManagers import kafka.server.logger.RuntimeLoggerManager import kafka.server.metadata.KRaftMetadataCache @@ -55,6 +54,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.raft.RaftManager import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager, ProcessRole} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal} diff --git a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala index cd6b8e1d13484..ade5160b37d12 100644 --- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.raft.RaftManager import kafka.utils.Logging import org.apache.kafka.clients._ import org.apache.kafka.common.metrics.Metrics @@ -28,6 +27,7 @@ import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{Node, Reconfigurable} +import org.apache.kafka.raft.RaftManager import org.apache.kafka.server.common.{ApiMessageAndVersion, ControllerRequestCompletionHandler, NodeToControllerChannelManager} import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler} @@ -37,7 +37,7 @@ import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.atomic.AtomicReference import scala.collection.Seq import scala.jdk.CollectionConverters._ -import scala.jdk.OptionConverters.{RichOption, RichOptionalInt} +import scala.jdk.OptionConverters.{RichOption, RichOptional, RichOptionalInt} case class ControllerInformation( node: Option[Node], @@ -79,7 +79,7 @@ class RaftControllerNodeProvider( val saslMechanism: String ) extends ControllerNodeProvider with Logging { - private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName) + private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName).toScala override def getControllerInfo(): ControllerInformation = ControllerInformation(raftManager.leaderAndEpoch.leaderId.toScala.flatMap(idToNode), diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala index 2e9d8e2bb8a00..081fbec3c95d7 100644 --- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala +++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala @@ -18,7 +18,6 @@ package kafka.tools import kafka.network.RequestChannel -import kafka.raft.RaftManager import kafka.server.ApiRequestHandler import kafka.utils.Logging import org.apache.kafka.common.internals.FatalExitError @@ -26,6 +25,7 @@ import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, EndQuorumE import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage} import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, BeginQuorumEpochResponse, EndQuorumEpochResponse, FetchResponse, FetchSnapshotResponse, VoteResponse} import org.apache.kafka.common.utils.Time +import org.apache.kafka.raft.RaftManager import org.apache.kafka.server.ApiVersionManager import org.apache.kafka.server.common.RequestLocal diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index a47b9fd4d47f7..e1038c6941861 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit} import joptsimple.{OptionException, OptionSpec} import kafka.network.SocketServer -import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager} +import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager} import kafka.server.{KafkaConfig, KafkaRequestHandlerPool} import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.errors.InvalidConfigurationException @@ -37,7 +37,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok import org.apache.kafka.common.utils.{Exit, Time, Utils} import org.apache.kafka.common.{TopicPartition, Uuid, protocol} import org.apache.kafka.raft.errors.NotLeaderException -import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient} +import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, QuorumConfig, RaftClient, RaftManager} import org.apache.kafka.security.CredentialProvider import org.apache.kafka.server.SimpleApiVersionManager import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion} diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 901dbe74356c7..43c7d5aecf464 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -18,7 +18,6 @@ package kafka.server import kafka.network.RequestChannel -import kafka.raft.RaftManager import kafka.server.QuotaFactory.QuotaManagers import kafka.server.metadata.KRaftMetadataCache import org.apache.kafka.clients.admin.AlterConfigOp @@ -56,7 +55,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.network.metrics.RequestChannelMetrics import org.apache.kafka.network.Session -import org.apache.kafka.raft.QuorumConfig +import org.apache.kafka.raft.{QuorumConfig, RaftManager} import org.apache.kafka.server.SimpleApiVersionManager import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer} import org.apache.kafka.server.common.{ApiMessageAndVersion, FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock, RequestLocal} diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftManager.java b/raft/src/main/java/org/apache/kafka/raft/RaftManager.java new file mode 100644 index 0000000000000..a1c2e34512877 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/RaftManager.java @@ -0,0 +1,33 @@ +package org.apache.kafka.raft; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.server.common.serialization.RecordSerde; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +public interface RaftManager { + + CompletableFuture handleRequest( + RequestContext context, + RequestHeader header, + ApiMessage request, + long createdTimeMs + ); + + void register(RaftClient.Listener listener); + + LeaderAndEpoch leaderAndEpoch(); + + RaftClient client(); + + ReplicatedLog replicatedLog(); + + Optional voterNode(int id, ListenerName listener); + + RecordSerde recordSerde(); +} From 60feeffdd19ab14ddd2591b8f7ecda16d7c6b1fb Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Mon, 18 Aug 2025 18:28:29 +0800 Subject: [PATCH 2/3] Add license header to new file to fix build error --- .../java/org/apache/kafka/raft/RaftManager.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftManager.java b/raft/src/main/java/org/apache/kafka/raft/RaftManager.java index a1c2e34512877..f8f53771acb76 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftManager.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftManager.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.kafka.raft; import org.apache.kafka.common.Node; From fe544702d99fbed744f4b84e978b398fbc7b9ea7 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Tue, 19 Aug 2025 07:48:51 +0800 Subject: [PATCH 3/3] Address chia's comments --- .../src/main/scala/kafka/raft/KafkaRaftManager.scala | 12 +----------- .../src/main/scala/kafka/server/ControllerApis.scala | 2 +- .../server/NodeToControllerChannelManager.scala | 2 +- core/src/main/scala/kafka/server/SharedServer.scala | 2 +- core/src/main/scala/kafka/tools/TestRaftServer.scala | 2 +- .../server/AllocateProducerIdsRequestTest.scala | 4 ++-- .../server/epoch/LeaderEpochIntegrationTest.scala | 2 +- .../main/java/org/apache/kafka/raft/RaftManager.java | 4 ---- 8 files changed, 8 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/kafka/raft/KafkaRaftManager.scala b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala index 794ac5b3c28af..f54efa4bb4649 100644 --- a/core/src/main/scala/kafka/raft/KafkaRaftManager.scala +++ b/core/src/main/scala/kafka/raft/KafkaRaftManager.scala @@ -38,7 +38,7 @@ import org.apache.kafka.common.requests.RequestHeader import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time, Utils} -import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, MetadataLogConfig, QuorumConfig, RaftClient, RaftManager, ReplicatedLog, TimingWheelExpirationService} +import org.apache.kafka.raft.{Endpoints, ExternalKRaftMetrics, FileQuorumStateStore, KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, MetadataLogConfig, QuorumConfig, RaftManager, ReplicatedLog, TimingWheelExpirationService} import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.common.Feature import org.apache.kafka.server.common.serialization.RecordSerde @@ -152,12 +152,6 @@ class KafkaRaftManager[T]( CoreUtils.swallow(dataDirLock.foreach(_.destroy()), this) } - override def register( - listener: RaftClient.Listener[T] - ): Unit = { - client.register(listener) - } - override def handleRequest( context: RequestContext, header: RequestHeader, @@ -266,10 +260,6 @@ class KafkaRaftManager[T]( (controllerListenerName, networkClient) } - override def leaderAndEpoch: LeaderAndEpoch = { - client.leaderAndEpoch - } - override def voterNode(id: Int, listener: ListenerName): Optional[Node] = { client.voterNode(id, listener) } diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 57336d52eebea..ac9a2d9eff1e7 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -1070,7 +1070,7 @@ class ControllerApis( EndpointType.CONTROLLER, clusterId, () => registrationsPublisher.describeClusterControllers(request.context.listenerName()), - () => raftManager.leaderAndEpoch.leaderId().orElse(-1) + () => raftManager.client.leaderAndEpoch.leaderId().orElse(-1) ) requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeClusterResponse(response.setThrottleTimeMs(requestThrottleMs))) diff --git a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala index ade5160b37d12..e689649c1efea 100644 --- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala @@ -82,7 +82,7 @@ class RaftControllerNodeProvider( private def idToNode(id: Int): Option[Node] = raftManager.voterNode(id, listenerName).toScala override def getControllerInfo(): ControllerInformation = - ControllerInformation(raftManager.leaderAndEpoch.leaderId.toScala.flatMap(idToNode), + ControllerInformation(raftManager.client.leaderAndEpoch.leaderId.toScala.flatMap(idToNode), listenerName, securityProtocol, saslMechanism) } diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 03f1c9b929e6a..aba9035cb7e94 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -342,7 +342,7 @@ class SharedServer( throw new RuntimeException("Unable to install metadata publishers.", t) } } - _raftManager.register(loader) + _raftManager.client.register(loader) debug("Completed SharedServer startup.") started = true } catch { diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index e1038c6941861..f16aa2c0be549 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -181,7 +181,7 @@ class TestRaftServer( private var claimedEpoch: Option[Int] = None - raftManager.register(this) + raftManager.client.register(this) override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch): Unit = { if (newLeaderAndEpoch.isLeader(config.nodeId)) { diff --git a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala index aa399985f8374..16a82fdca8b30 100644 --- a/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala @@ -34,7 +34,7 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { def testAllocateProducersIdSentToController(): Unit = { val sourceBroker = cluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer] - val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt + val controllerId = sourceBroker.raftManager.client.leaderAndEpoch.leaderId().getAsInt val controllerServer = cluster.controllers.values().stream() .filter(_.config.nodeId == controllerId) .findFirst() @@ -50,7 +50,7 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { def testAllocateProducersIdSentToNonController(): Unit = { val sourceBroker = cluster.brokers.values().stream().findFirst().get().asInstanceOf[BrokerServer] - val controllerId = sourceBroker.raftManager.leaderAndEpoch.leaderId().getAsInt + val controllerId = sourceBroker.raftManager.client.leaderAndEpoch.leaderId().getAsInt val controllerServer = cluster.controllers().values().stream() .filter(_.config.nodeId != controllerId) .findFirst() diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index d678a497b53bb..f1ba2c7ac5ed6 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -293,7 +293,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness with Logging { } private def waitUntilQuorumLeaderElected(controllerServer: ControllerServer, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = { - val (leaderAndEpoch, _) = computeUntilTrue(controllerServer.raftManager.leaderAndEpoch, waitTime = timeout)(_.leaderId().isPresent) + val (leaderAndEpoch, _) = computeUntilTrue(controllerServer.raftManager.client.leaderAndEpoch, waitTime = timeout)(_.leaderId().isPresent) leaderAndEpoch.leaderId().orElseThrow(() => new AssertionError(s"Quorum Controller leader not elected after $timeout ms")) } diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftManager.java b/raft/src/main/java/org/apache/kafka/raft/RaftManager.java index f8f53771acb76..84ec24c067644 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftManager.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftManager.java @@ -36,10 +36,6 @@ CompletableFuture handleRequest( long createdTimeMs ); - void register(RaftClient.Listener listener); - - LeaderAndEpoch leaderAndEpoch(); - RaftClient client(); ReplicatedLog replicatedLog();