Skip to content

Commit 3996e90

Browse files
authored
Remove casts to KRaftMetadataCache (#18579)
Reviewers: Mickael Maison <[email protected]>
1 parent b90ef86 commit 3996e90

File tree

4 files changed

+13
-10
lines changed

4 files changed

+13
-10
lines changed

core/src/main/scala/kafka/server/KafkaApis.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -2368,7 +2368,7 @@ class KafkaApis(val requestChannel: RequestChannel,
23682368
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
23692369
describeClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
23702370
} else {
2371-
val result = metadataCache.asInstanceOf[KRaftMetadataCache].describeClientQuotas(describeClientQuotasRequest.data())
2371+
val result = metadataCache.describeClientQuotas(describeClientQuotasRequest.data())
23722372
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
23732373
result.setThrottleTimeMs(requestThrottleMs)
23742374
new DescribeClientQuotasResponse(result)
@@ -2383,7 +2383,7 @@ class KafkaApis(val requestChannel: RequestChannel,
23832383
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
23842384
describeUserScramCredentialsRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
23852385
} else {
2386-
val result = metadataCache.asInstanceOf[KRaftMetadataCache].describeScramCredentials(describeUserScramCredentialsRequest.data())
2386+
val result = metadataCache.describeScramCredentials(describeUserScramCredentialsRequest.data())
23872387
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
23882388
new DescribeUserScramCredentialsResponse(result.setThrottleTimeMs(requestThrottleMs)))
23892389
}

core/src/main/scala/kafka/server/MetadataCache.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package kafka.server
1919

20-
import kafka.server.metadata.KRaftMetadataCache
20+
import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
2121
import org.apache.kafka.admin.BrokerMetadata
22-
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
22+
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, DescribeClientQuotasResponseData, DescribeUserScramCredentialsRequestData, DescribeUserScramCredentialsResponseData, MetadataResponseData, UpdateMetadataRequestData}
2323
import org.apache.kafka.common.network.ListenerName
2424
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
2525
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
@@ -39,7 +39,7 @@ sealed trait CachedControllerId {
3939
case class ZkCachedControllerId(id: Int) extends CachedControllerId
4040
case class KRaftCachedControllerId(id: Int) extends CachedControllerId
4141

42-
trait MetadataCache {
42+
trait MetadataCache extends ConfigRepository {
4343
/**
4444
* Return topic metadata for a given set of topics and listener. See KafkaApis#handleTopicMetadataRequest for details
4545
* on the use of the two boolean flags.
@@ -113,6 +113,10 @@ trait MetadataCache {
113113
def getRandomAliveBrokerId: Option[Int]
114114

115115
def features(): FinalizedFeatures
116+
117+
def describeClientQuotas(request: DescribeClientQuotasRequestData): DescribeClientQuotasResponseData
118+
119+
def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData
116120
}
117121

118122
object MetadataCache {

core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import scala.util.control.Breaks._
4949
class KRaftMetadataCache(
5050
val brokerId: Int,
5151
val kraftVersionSupplier: Supplier[KRaftVersion]
52-
) extends MetadataCache with Logging with ConfigRepository {
52+
) extends MetadataCache with Logging {
5353
this.logIdent = s"[MetadataCache brokerId=$brokerId] "
5454

5555
// This is the cache state. Every MetadataImage instance is immutable, and updates
@@ -539,11 +539,11 @@ class KRaftMetadataCache(
539539
override def config(configResource: ConfigResource): Properties =
540540
_currentImage.configs().configProperties(configResource)
541541

542-
def describeClientQuotas(request: DescribeClientQuotasRequestData): DescribeClientQuotasResponseData = {
542+
override def describeClientQuotas(request: DescribeClientQuotasRequestData): DescribeClientQuotasResponseData = {
543543
_currentImage.clientQuotas().describe(request)
544544
}
545545

546-
def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData = {
546+
override def describeScramCredentials(request: DescribeUserScramCredentialsRequestData): DescribeUserScramCredentialsResponseData = {
547547
_currentImage.scram().describe(request)
548548
}
549549

core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
2828
import java.util.{Collections, Optional, Properties}
2929
import java.{time, util}
3030
import kafka.integration.KafkaServerTestHarness
31-
import kafka.server.metadata.KRaftMetadataCache
3231
import kafka.server.KafkaConfig
3332
import kafka.utils.TestUtils._
3433
import kafka.utils.{Log4jController, TestInfoUtils, TestUtils}
@@ -3613,7 +3612,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
36133612

36143613
def validateLogConfig(compressionType: String): Unit = {
36153614
ensureConsistentKRaftMetadata()
3616-
val topicProps = brokers.head.metadataCache.asInstanceOf[KRaftMetadataCache].topicConfig(topic)
3615+
val topicProps = brokers.head.metadataCache.topicConfig(topic)
36173616
val logConfig = LogConfig.fromProps(Collections.emptyMap[String, AnyRef], topicProps)
36183617

36193618
assertEquals(compressionType, logConfig.originals.get(TopicConfig.COMPRESSION_TYPE_CONFIG))

0 commit comments

Comments
 (0)