Skip to content

Commit b2b2408

Browse files
authored
KAFKA-18360 Remove zookeeper configurations (apache#18566)
Remove broker.id.generation.enable and reserved.broker.max.id, which are not used in KRaft mode. Remove inter.broker.protocol.version, which is not used in KRaft mode. Reviewers: PoAn Yang <[email protected]>, Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent a3d9d88 commit b2b2408

File tree

12 files changed

+47
-161
lines changed

12 files changed

+47
-161
lines changed

core/src/main/scala/kafka/server/KafkaConfig.scala

+3-28
Original file line numberDiff line numberDiff line change
@@ -389,28 +389,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
389389
val uncleanLeaderElectionCheckIntervalMs: Long = getLong(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG)
390390
def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)
391391

392-
// We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0`
393-
// is passed, `0.10.0-IV0` may be picked)
394-
val interBrokerProtocolVersionString = getString(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)
395-
val interBrokerProtocolVersion = if (processRoles.isEmpty) {
396-
MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
397-
} else {
398-
if (originals.containsKey(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)) {
399-
// A user-supplied IBP was given
400-
val configuredVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
401-
if (!configuredVersion.isKRaftSupported) {
402-
throw new ConfigException(s"A non-KRaft version $interBrokerProtocolVersionString given for ${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}. " +
403-
s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}")
404-
} else {
405-
warn(s"${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG} is deprecated in KRaft mode as of 3.3 and will only " +
406-
s"be read when first upgrading from a KRaft prior to 3.3. See kafka-storage.sh help for details on setting " +
407-
s"the metadata.version for a new KRaft cluster.")
408-
}
409-
}
410-
// In KRaft mode, we pin this value to the minimum KRaft-supported version. This prevents inadvertent usage of
411-
// the static IBP config in broker components running in KRaft mode
412-
MetadataVersion.MINIMUM_KRAFT_VERSION
413-
}
392+
// This will be removed soon. See KAFKA-18366.
393+
val interBrokerProtocolVersion = MetadataVersion.MINIMUM_KRAFT_VERSION
414394

415395
/** ********* Controlled shutdown configuration ***********/
416396
val controlledShutdownEnable = getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)
@@ -713,15 +693,10 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
713693
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
714694
validateAdvertisedControllerListenersNonEmptyForKRaftController()
715695
validateControllerListenerNamesMustAppearInListenersForKRaftController()
716-
} else {
717-
// controller listener names must be empty when not in KRaft mode
718-
require(controllerListenerNames.isEmpty,
719-
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must be empty when not running in KRaft mode: ${controllerListenerNames.asJava}")
720696
}
721697

722698
val listenerNames = listeners.map(_.listenerName).toSet
723-
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
724-
// validations for all broker setups (i.e. broker-only and co-located)
699+
if (processRoles.contains(ProcessRole.BrokerRole)) {
725700
validateAdvertisedBrokerListenersNonEmptyForBroker()
726701
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
727702
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +

core/src/main/scala/kafka/server/KafkaRaftServer.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,7 @@ object KafkaRaftServer {
182182
}
183183

184184
// Load the BootstrapMetadata.
185-
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir,
186-
Optional.ofNullable(config.interBrokerProtocolVersionString))
185+
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir, Optional.empty())
187186
val bootstrapMetadata = bootstrapDirectory.read()
188187
(metaPropsEnsemble, bootstrapMetadata)
189188
}

core/src/main/scala/kafka/tools/StorageTool.scala

+3-6
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
3232
import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
3333
import org.apache.kafka.raft.{DynamicVoters, QuorumConfig}
3434
import org.apache.kafka.server.ProcessRole
35-
import org.apache.kafka.server.config.ReplicationConfigs
3635

3736
import java.util
3837
import scala.collection.mutable
@@ -129,11 +128,9 @@ object StorageTool extends Logging {
129128
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
130129
setControllerListenerName(config.controllerListenerNames.head).
131130
setMetadataLogDirectory(config.metadataLogDir)
132-
Option(namespace.getString("release_version")) match {
133-
case Some(releaseVersion) => formatter.setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion))
134-
case None => Option(config.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).
135-
foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString)))
136-
}
131+
Option(namespace.getString("release_version")).foreach(
132+
releaseVersion => formatter.
133+
setReleaseVersion(MetadataVersion.fromVersionString(releaseVersion)))
137134
Option(namespace.getList[String]("feature")).foreach(
138135
featureNamesAndLevels(_).foreachEntry {
139136
(k, v) => formatter.setFeatureLevel(k, v)

core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import org.apache.kafka.server.authorizer.AuthorizationResult;
6565
import org.apache.kafka.server.authorizer.Authorizer;
6666
import org.apache.kafka.server.common.KRaftVersion;
67-
import org.apache.kafka.server.common.MetadataVersion;
6867
import org.apache.kafka.server.config.KRaftConfigs;
6968

7069
import org.junit.jupiter.api.Test;
@@ -545,7 +544,6 @@ KafkaConfig createKafkaDefaultConfig() {
545544
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, voterId + "@localhost:9093");
546545
properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL");
547546
properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,SSL:SSL");
548-
TestUtils.setIbpVersion(properties, MetadataVersion.latestProduction());
549547
return new KafkaConfig(properties);
550548
}
551549
}

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

+6-6
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,12 @@ class KafkaApisTest extends Logging {
158158
metrics.close()
159159
}
160160

161-
def createKafkaApis(interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latestTesting,
162-
authorizer: Option[Authorizer] = None,
163-
configRepository: ConfigRepository = new MockConfigRepository(),
164-
overrideProperties: Map[String, String] = Map.empty,
165-
featureVersions: Seq[FeatureVersion] = Seq.empty): KafkaApis = {
161+
def createKafkaApis(
162+
authorizer: Option[Authorizer] = None,
163+
configRepository: ConfigRepository = new MockConfigRepository(),
164+
overrideProperties: Map[String, String] = Map.empty,
165+
featureVersions: Seq[FeatureVersion] = Seq.empty
166+
): KafkaApis = {
166167

167168
val properties = TestUtils.createBrokerConfig(brokerId)
168169
properties.put(KRaftConfigs.NODE_ID_CONFIG, brokerId.toString)
@@ -171,7 +172,6 @@ class KafkaApisTest extends Logging {
171172
properties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$voterId@localhost:9093")
172173

173174
overrideProperties.foreach( p => properties.put(p._1, p._2))
174-
TestUtils.setIbpVersion(properties, interBrokerProtocolVersion)
175175
val config = new KafkaConfig(properties)
176176

177177
val listenerType = ListenerType.BROKER

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

+16-48
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ class KafkaConfigTest {
159159
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props1))
160160
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props2))
161161
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props3))
162-
163162
}
164163

165164
@Test
@@ -187,7 +186,7 @@ class KafkaConfigTest {
187186
val advertisedHostName = "routable-host"
188187
val advertisedPort = 1234
189188

190-
val props = TestUtils.createBrokerConfig(0)
189+
val props = createDefaultConfig()
191190
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
192191

193192
val serverConfig = KafkaConfig.fromProps(props)
@@ -617,29 +616,6 @@ class KafkaConfigTest {
617616
assertEquals(conf.effectiveAdvertisedBrokerListeners, listenerListToEndPoints("PLAINTEXT://:9092"))
618617
}
619618

620-
@Test
621-
def testVersionConfiguration(): Unit = {
622-
val props = new Properties()
623-
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
624-
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
625-
props.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
626-
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
627-
val conf = KafkaConfig.fromProps(props)
628-
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, conf.interBrokerProtocolVersion)
629-
630-
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0.0-IV1")
631-
val conf2 = KafkaConfig.fromProps(props)
632-
assertEquals(MetadataVersion.IBP_3_0_IV1, conf2.interBrokerProtocolVersion)
633-
634-
// check that patch version doesn't affect equality
635-
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.0.1-IV1")
636-
val conf3 = KafkaConfig.fromProps(props)
637-
assertEquals(MetadataVersion.IBP_3_0_IV1, conf3.interBrokerProtocolVersion)
638-
639-
//check that latest is newer than 3.0.1-IV0
640-
assertTrue(MetadataVersion.latestTesting.isAtLeast(conf3.interBrokerProtocolVersion))
641-
}
642-
643619
private def isValidKafkaConfig(props: Properties): Boolean = {
644620
try {
645621
KafkaConfig.fromProps(props)
@@ -1406,27 +1382,30 @@ class KafkaConfigTest {
14061382
}
14071383

14081384
@Test
1409-
def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = {
1410-
// -1 is the default for both node.id and broker.id
1385+
def testAcceptsLargeId(): Unit = {
1386+
val largeBrokerId = 2000
14111387
val props = new Properties()
14121388
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
1413-
assertFalse(isValidKafkaConfig(props))
1389+
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092")
1390+
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
1391+
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
1392+
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, largeBrokerId.toString)
1393+
KafkaConfig.fromProps(props)
14141394
}
14151395

14161396
@Test
1417-
def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = {
1418-
// -1 is the default for both node.id and broker.id
1419-
val props = new Properties()
1420-
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
1397+
def testRejectsNegativeNodeId(): Unit = {
1398+
val props = createDefaultConfig()
1399+
props.remove(ServerConfigs.BROKER_ID_CONFIG)
1400+
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "-1")
14211401
assertFalse(isValidKafkaConfig(props))
14221402
}
14231403

14241404
@Test
1425-
def testRejectsNegativeNodeId(): Unit = {
1426-
// -1 is the default for both node.id and broker.id
1427-
val props = new Properties()
1428-
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker")
1429-
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
1405+
def testRejectsNegativeBrokerId(): Unit = {
1406+
val props = createDefaultConfig()
1407+
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "-1")
1408+
props.remove(KRaftConfigs.NODE_ID_CONFIG)
14301409
assertFalse(isValidKafkaConfig(props))
14311410
}
14321411

@@ -1613,17 +1592,6 @@ class KafkaConfigTest {
16131592
assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage)
16141593
}
16151594

1616-
@Test
1617-
def testIgnoreUserInterBrokerProtocolVersionKRaft(): Unit = {
1618-
for (ibp <- Seq("3.0", "3.1", "3.2")) {
1619-
val props = new Properties()
1620-
props.putAll(kraftProps())
1621-
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, ibp)
1622-
val config = new KafkaConfig(props)
1623-
assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
1624-
}
1625-
}
1626-
16271595
@Test
16281596
def testDefaultInterBrokerProtocolVersionKRaft(): Unit = {
16291597
val props = new Properties()

core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala

+4-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadat
2626
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
2727
import org.apache.kafka.raft.QuorumConfig
2828
import org.apache.kafka.network.SocketServerConfigs
29-
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
29+
import org.apache.kafka.server.config.{KRaftConfigs, ServerLogConfigs}
3030
import org.apache.kafka.server.common.MetadataVersion
3131
import org.apache.kafka.test.TestUtils
3232
import org.junit.jupiter.api.Assertions._
@@ -262,7 +262,7 @@ class KafkaRaftServerTest {
262262
}
263263

264264
@Test
265-
def testKRaftUpdateWithIBP(): Unit = {
265+
def testKRaftUpdateAt3_3_IV1(): Unit = {
266266
val clusterId = clusterIdBase64
267267
val nodeId = 0
268268
val metaProperties = new MetaProperties.Builder().
@@ -278,10 +278,9 @@ class KafkaRaftServerTest {
278278
configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
279279
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093")
280280
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
281-
configProperties.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, "3.3-IV1")
282281

283282
val (metaPropertiesEnsemble, bootstrapMetadata) =
284-
invokeLoadMetaProperties(metaProperties, configProperties, None)
283+
invokeLoadMetaProperties(metaProperties, configProperties, Some(MetadataVersion.IBP_3_3_IV1))
285284

286285
assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next())
287286
assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
@@ -290,7 +289,7 @@ class KafkaRaftServerTest {
290289
}
291290

292291
@Test
293-
def testKRaftUpdateWithoutIBP(): Unit = {
292+
def testKRaftUpdate(): Unit = {
294293
val clusterId = clusterIdBase64
295294
val nodeId = 0
296295
val metaProperties = new MetaProperties.Builder().

core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala

+14-12
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBat
3333
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
3434
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
3535
import org.apache.kafka.common.utils.{LogContext, Time}
36-
import org.apache.kafka.server.config.ReplicationConfigs
3736
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndEpoch}
3837
import org.apache.kafka.server.network.BrokerEndPoint
3938
import org.apache.kafka.storage.internals.log.LogAppendInfo
@@ -78,13 +77,16 @@ class ReplicaFetcherThreadTest {
7877
TestUtils.clearYammerMetrics()
7978
}
8079

81-
private def createReplicaFetcherThread(name: String,
82-
fetcherId: Int,
83-
brokerConfig: KafkaConfig,
84-
failedPartitions: FailedPartitions,
85-
replicaMgr: ReplicaManager,
86-
quota: ReplicaQuota,
87-
leaderEndpointBlockingSend: BlockingSend): ReplicaFetcherThread = {
80+
private def createReplicaFetcherThread(
81+
name: String,
82+
fetcherId: Int,
83+
brokerConfig: KafkaConfig,
84+
failedPartitions: FailedPartitions,
85+
replicaMgr: ReplicaManager,
86+
quota: ReplicaQuota,
87+
leaderEndpointBlockingSend: BlockingSend,
88+
metadataVersion: MetadataVersion = MetadataVersion.latestTesting()
89+
): ReplicaFetcherThread = {
8890
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${leaderEndpointBlockingSend.brokerEndPoint().id}, fetcherId=$fetcherId] ")
8991
val fetchSessionHandler = new FetchSessionHandler(logContext, leaderEndpointBlockingSend.brokerEndPoint().id)
9092
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler,
@@ -96,7 +98,7 @@ class ReplicaFetcherThreadTest {
9698
replicaMgr,
9799
quota,
98100
logContext.logPrefix,
99-
() => brokerConfig.interBrokerProtocolVersion)
101+
() => metadataVersion)
100102
}
101103

102104
@Test
@@ -179,9 +181,8 @@ class ReplicaFetcherThreadTest {
179181
verifyFetchLeaderEpochOnFirstFetch(MetadataVersion.latestTesting, epochFetchCount = 0)
180182
}
181183

182-
private def verifyFetchLeaderEpochOnFirstFetch(ibp: MetadataVersion, epochFetchCount: Int): Unit = {
184+
private def verifyFetchLeaderEpochOnFirstFetch(metadataVersion: MetadataVersion, epochFetchCount: Int): Unit = {
183185
val props = TestUtils.createBrokerConfig(1)
184-
props.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, ibp.version)
185186
val config = KafkaConfig.fromProps(props)
186187

187188
//Setup all dependencies
@@ -219,7 +220,8 @@ class ReplicaFetcherThreadTest {
219220
failedPartitions,
220221
replicaManager,
221222
UNBOUNDED_QUOTA,
222-
mockNetwork
223+
mockNetwork,
224+
metadataVersion
223225
)
224226
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), 0L), t1p1 -> initialFetchState(Some(topicId1), 0L)))
225227

core/src/test/scala/unit/kafka/tools/StorageToolTest.scala

-14
Original file line numberDiff line numberDiff line change
@@ -357,26 +357,12 @@ Found problem:
357357
"Failed to find content in output: " + stream.toString())
358358
}
359359

360-
@Test
361-
def testFormatWithReleaseVersionDefault(): Unit = {
362-
val availableDirs = Seq(TestUtils.tempDir())
363-
val properties = new Properties()
364-
properties.putAll(defaultStaticQuorumProperties)
365-
properties.setProperty("log.dirs", availableDirs.mkString(","))
366-
properties.setProperty("inter.broker.protocol.version", "3.7")
367-
val stream = new ByteArrayOutputStream()
368-
assertEquals(0, runFormatCommand(stream, properties))
369-
assertTrue(stream.toString().contains("3.7-IV4"),
370-
"Failed to find content in output: " + stream.toString())
371-
}
372-
373360
@Test
374361
def testFormatWithReleaseVersionDefaultAndReleaseVersion(): Unit = {
375362
val availableDirs = Seq(TestUtils.tempDir())
376363
val properties = new Properties()
377364
properties.putAll(defaultStaticQuorumProperties)
378365
properties.setProperty("log.dirs", availableDirs.mkString(","))
379-
properties.setProperty("inter.broker.protocol.version", "3.7")
380366
val stream = new ByteArrayOutputStream()
381367
assertEquals(0, runFormatCommand(stream, properties, Seq(
382368
"--release-version", "3.6-IV0",

core/src/test/scala/unit/kafka/utils/TestUtils.scala

-4
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,6 @@ object TestUtils extends Logging {
314314
props
315315
}
316316

317-
def setIbpVersion(config: Properties, version: MetadataVersion): Unit = {
318-
config.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, version.version)
319-
}
320-
321317
def createAdminClient[B <: KafkaBroker](
322318
brokers: Seq[B],
323319
listenerName: ListenerName,

0 commit comments

Comments
 (0)