Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18540: Remove UpdataMetadataRequest from KafkaApisTest #18591

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 195 additions & 74 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ
import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
import org.apache.kafka.common.message.ShareFetchRequestData.{AcknowledgementBatch, ForgottenTopic}
import org.apache.kafka.common.message.ShareFetchResponseData.{AcquiredRecords, PartitionData, ShareFetchableTopicResponse}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.metadata.{TopicRecord, PartitionRecord, RegisterBrokerRecord}
import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection}
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
Expand Down Expand Up @@ -3561,6 +3563,112 @@ class KafkaApisTest extends Logging {
assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
}

@Test
def testUnauthorizedTopicMetadataRequest(): Unit = {
// 1. Set up broker information
val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val endpoints = new BrokerEndpointCollection()
endpoints.add(
new BrokerEndpoint()
.setHost("broker0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setName(plaintextListener.value)
)
MetadataCacheTest.updateCache(metadataCache,
Seq(new RegisterBrokerRecord().setBrokerId(0).setRack("rack").setFenced(false).setEndPoints(endpoints))
)

// 2. Set up authorizer
val authorizer: Authorizer = mock(classOf[Authorizer])
val unauthorizedTopic = "unauthorized-topic"
val authorizedTopic = "authorized-topic"

val expectedActions = Seq(
new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 1, true, true),
new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, true, true)
)

when(authorizer.authorize(any[RequestContext], argThat((t: java.util.List[Action]) => t.containsAll(expectedActions.asJava))))
.thenAnswer { invocation =>
val actions = invocation.getArgument(1).asInstanceOf[util.List[Action]].asScala
actions.map { action =>
if (action.resourcePattern().name().equals(authorizedTopic))
AuthorizationResult.ALLOWED
else
AuthorizationResult.DENIED
}.asJava
}

// 3. Set up MetadataCache
val authorizedTopicId = Uuid.randomUuid()
val unauthorizedTopicId = Uuid.randomUuid()

val topicIds = new util.HashMap[String, Uuid]()
topicIds.put(authorizedTopic, authorizedTopicId)
topicIds.put(unauthorizedTopic, unauthorizedTopicId)
addTopicToMetadataCache(authorizedTopic, 1, topicId = authorizedTopicId)
addTopicToMetadataCache(unauthorizedTopic, 1, topicId = unauthorizedTopicId)

def createDummyPartitionRecord(topicId: Uuid) = {
new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(0)
.setLeader(0)
.setLeaderEpoch(0)
.setReplicas(Collections.singletonList(0))
.setIsr(Collections.singletonList(0))
}

val partitionRecords = Seq(authorizedTopicId, unauthorizedTopicId).map(createDummyPartitionRecord)
MetadataCacheTest.updateCache(metadataCache, partitionRecords)

// 4. Send TopicMetadataReq using topicId
val metadataReqByTopicId = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopicId, unauthorizedTopicId)).build()
val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleTopicMetadataRequest(repByTopicId)
val metadataByTopicIdResp = verifyNoThrottling[MetadataResponse](repByTopicId)

val metadataByTopicId = metadataByTopicIdResp.data().topics().asScala.groupBy(_.topicId()).map(kv => (kv._1, kv._2.head))

metadataByTopicId.foreach { case (topicId, metadataResponseTopic) =>
if (topicId == unauthorizedTopicId) {
// Return an TOPIC_AUTHORIZATION_FAILED on unauthorized error regardless of leaking the existence of topic id
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode())
// Do not return topic information on unauthorized error
assertNull(metadataResponseTopic.name())
} else {
assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
assertEquals(authorizedTopic, metadataResponseTopic.name())
}
}
kafkaApis.close()

// 4. Send TopicMetadataReq using topic name
reset(clientRequestQuotaManager, requestChannel)
val metadataReqByTopicName = new MetadataRequest.Builder(util.Arrays.asList(authorizedTopic, unauthorizedTopic), false).build()
val repByTopicName = buildRequest(metadataReqByTopicName, plaintextListener)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleTopicMetadataRequest(repByTopicName)
val metadataByTopicNameResp = verifyNoThrottling[MetadataResponse](repByTopicName)

val metadataByTopicName = metadataByTopicNameResp.data().topics().asScala.groupBy(_.name()).map(kv => (kv._1, kv._2.head))

metadataByTopicName.foreach { case (topicName, metadataResponseTopic) =>
if (topicName == unauthorizedTopic) {
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), metadataResponseTopic.errorCode())
// Do not return topic Id on unauthorized error
assertEquals(Uuid.ZERO_UUID, metadataResponseTopic.topicId())
} else {
assertEquals(Errors.NONE.code(), metadataResponseTopic.errorCode())
assertEquals(authorizedTopicId, metadataResponseTopic.topicId())
}
}
}

/**
* Verifies that sending a fetch request with version 9 works correctly when
* ReplicaManager.getLogConfig returns None.
Expand Down Expand Up @@ -8878,30 +8986,28 @@ class KafkaApisTest extends Logging {
@Test
def testDescribeClusterRequest(): Unit = {
val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val brokers = Seq(
new UpdateMetadataBroker()
.setId(0)
.setRack("rack")
.setEndpoints(Seq(
new UpdateMetadataEndpoint()
.setHost("broker0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value)
).asJava),
new UpdateMetadataBroker()
.setId(1)
.setRack("rack")
.setEndpoints(Seq(
new UpdateMetadataEndpoint()
.setHost("broker1")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value)).asJava)
val endpoints = new BrokerEndpointCollection()
endpoints.add(
new BrokerEndpoint()
.setHost("broker0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setName(plaintextListener.value)
)
endpoints.add(
new BrokerEndpoint()
.setHost("broker1")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setName(plaintextListener.value)
)
val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, 0, Seq.empty[UpdateMetadataPartitionState].asJava, brokers.asJava, Collections.emptyMap()).build()
MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest)

MetadataCacheTest.updateCache(metadataCache,
Seq(new RegisterBrokerRecord()
.setBrokerId(brokerId)
.setRack("rack")
.setFenced(false)
.setEndPoints(endpoints)))

val describeClusterRequest = new DescribeClusterRequest.Builder(new DescribeClusterRequestData()
.setIncludeClusterAuthorizedOperations(true)).build()
Expand All @@ -8924,35 +9030,37 @@ class KafkaApisTest extends Logging {
private def updateMetadataCacheWithInconsistentListeners(): (ListenerName, ListenerName) = {
val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val anotherListener = new ListenerName("LISTENER2")
val brokers = Seq(
new UpdateMetadataBroker()
.setId(0)
.setRack("rack")
.setEndpoints(Seq(
new UpdateMetadataEndpoint()
.setHost("broker0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value),
new UpdateMetadataEndpoint()
.setHost("broker0")
.setPort(9093)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(anotherListener.value)
).asJava),
new UpdateMetadataBroker()
.setId(1)
.setRack("rack")
.setEndpoints(Seq(
new UpdateMetadataEndpoint()
.setHost("broker1")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(plaintextListener.value)).asJava)
)
val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, 0, Seq.empty[UpdateMetadataPartitionState].asJava, brokers.asJava, Collections.emptyMap()).build()
MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest)

val endpoints0 = new BrokerEndpointCollection()
endpoints0.add(
new BrokerEndpoint()
.setHost("broker0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setName(plaintextListener.value)
)
endpoints0.add(
new BrokerEndpoint()
.setHost("broker0")
.setPort(9093)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setName(anotherListener.value)
)

val endpoints1 = new BrokerEndpointCollection()
endpoints1.add(
new BrokerEndpoint()
.setHost("broker1")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setName(plaintextListener.value)
)

MetadataCacheTest.updateCache(metadataCache,
Seq(new RegisterBrokerRecord().setBrokerId(0).setRack("rack").setFenced(false).setEndPoints(endpoints0),
new RegisterBrokerRecord().setBrokerId(1).setRack("rack").setFenced(false).setEndPoints(endpoints1))
)

(plaintextListener, anotherListener)
}

Expand Down Expand Up @@ -9142,51 +9250,64 @@ class KafkaApisTest extends Logging {
).asInstanceOf[T]
}

private def createBasicMetadataRequest(topic: String,
private def createBasicMetadata(topic: String,
numPartitions: Int,
brokerEpoch: Long,
numBrokers: Int,
topicId: Uuid): UpdateMetadataRequest = {
topicId: Uuid): Seq[ApiMessage] = {

val results = new mutable.ArrayBuffer[ApiMessage]()
val topicRecord = new TopicRecord().setName(topic).setTopicId(topicId)
results += topicRecord

val replicas = List(0.asInstanceOf[Integer]).asJava

def createPartitionState(partition: Int) = new UpdateMetadataPartitionState()
.setTopicName(topic)
.setPartitionIndex(partition)
.setControllerEpoch(1)
def createPartitionRecord(partition: Int) = new PartitionRecord()
.setTopicId(topicId)
.setPartitionId(partition)
.setLeader(0)
.setLeaderEpoch(1)
.setReplicas(replicas)
.setZkVersion(0)
.setIsr(replicas)

val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val partitionStates = (0 until numPartitions).map(createPartitionState)
val partitionRecords = (0 until numPartitions).map(createPartitionRecord)
val liveBrokers = (0 until numBrokers).map(
brokerId => createMetadataBroker(brokerId, plaintextListener))
new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
0, brokerEpoch, partitionStates.asJava, liveBrokers.asJava, Collections.singletonMap(topic, topicId)).build()
brokerId => createMetadataBroker(brokerId, plaintextListener, brokerEpoch))
partitionRecords.foreach(record => results += record)
liveBrokers.foreach(record => results +=record)

results.toSeq
}

private def setupBasicMetadataCache(topic: String, numPartitions: Int, numBrokers: Int, topicId: Uuid): Unit = {
val updateMetadataRequest = createBasicMetadataRequest(topic, numPartitions, 0, numBrokers, topicId)
MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest)
val updateMetadata = createBasicMetadata(topic, numPartitions, 0, numBrokers, topicId)
MetadataCacheTest.updateCache(metadataCache, updateMetadata)
}

private def addTopicToMetadataCache(topic: String, numPartitions: Int, numBrokers: Int = 1, topicId: Uuid = Uuid.ZERO_UUID): Unit = {
val updateMetadataRequest = createBasicMetadataRequest(topic, numPartitions, 0, numBrokers, topicId)
MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest)
val updateMetadata = createBasicMetadata(topic, numPartitions, 0, numBrokers, topicId)
MetadataCacheTest.updateCache(metadataCache, updateMetadata)
}

private def createMetadataBroker(brokerId: Int,
listener: ListenerName): UpdateMetadataBroker = {
new UpdateMetadataBroker()
.setId(brokerId)
.setRack("rack")
.setEndpoints(Seq(new UpdateMetadataEndpoint()
listener: ListenerName,
brokerEpoch: Long): RegisterBrokerRecord = {
val endpoints = new BrokerEndpointCollection()
endpoints.add(
new BrokerEndpoint()
.setHost("broker" + brokerId)
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
.setListener(listener.value)).asJava)
.setName(listener.value)
)

new RegisterBrokerRecord()
.setBrokerId(brokerId)
.setRack("rack")
.setFenced(false)
.setEndPoints(endpoints)
.setBrokerEpoch(brokerEpoch)
}

@Test
Expand Down
23 changes: 23 additions & 0 deletions core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,29 @@ object MetadataCacheTest {
MetadataCache.kRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0)
)

def updateCache(cache: MetadataCache, records: Seq[ApiMessage]): Unit = {
cache match {
case c: KRaftMetadataCache => {
val image = c.currentImage()
val partialImage = new MetadataImage(
new MetadataProvenance(100L, 10, 1000L, true),
image.features(),
ClusterImage.EMPTY,
image.topics(),
image.configs(),
image.clientQuotas(),
image.producerIds(),
image.acls(),
image.scram(),
image.delegationTokens())
val delta = new MetadataDelta.Builder().setImage(partialImage).build()
records.foreach(record => delta.replay(record))
c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L, true)))
}
case _ => throw new RuntimeException("Unsupported cache type")
}
}

def updateCache(cache: MetadataCache, request: UpdateMetadataRequest, records: Seq[ApiMessage] = List()): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will be removed in https://issues.apache.org/jira/browse/KAFKA-18578

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TaiJuWu I need KAFKA-18578 to unblock a different PR - are you planning to work on it soon?

Copy link
Contributor Author

@TaiJuWu TaiJuWu Jan 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TaiJuWu I need KAFKA-18578 to unblock a different PR - are you planning to work on it soon?

I will start that work tonight. If it’s not progressing quickly enough, feel free to take it over, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, that works great.

cache match {
case c: ZkMetadataCache => c.updateMetadata(0, request)
Expand Down
Loading