Skip to content

Commit 826cc54

Browse files
committed
KAFKA-18559: Cleanup FinalizedFeatures
JIRA: KAFKA-18559 Cleanup the zk logic and test in `FinalizedFeatures`
1 parent 8cc560e commit 826cc54

File tree

9 files changed

+16
-42
lines changed

9 files changed

+16
-42
lines changed

core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -558,8 +558,7 @@ class KRaftMetadataCache(
558558
}
559559
new FinalizedFeatures(image.features().metadataVersion(),
560560
finalizedFeatures,
561-
image.highestOffsetAndEpoch().offset,
562-
true)
561+
image.highestOffsetAndEpoch().offset)
563562
}
564563
}
565564

core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala

+1-3
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
8888
new FinalizedFeatures(
8989
MetadataVersion.latestTesting(),
9090
Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
91-
0,
92-
true
93-
)
91+
0)
9492
}
9593

9694
when(metadataCache.metadataVersion())

core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala

+2-6
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,7 @@ class TransactionStateManagerTest {
7575
new FinalizedFeatures(
7676
MetadataVersion.latestTesting(),
7777
Collections.singletonMap(TransactionVersion.FEATURE_NAME, TransactionVersion.TV_2.featureLevel()),
78-
0,
79-
true
80-
)
78+
0)
8179
}
8280

8381
val metrics = new Metrics()
@@ -1337,9 +1335,7 @@ class TransactionStateManagerTest {
13371335
new FinalizedFeatures(
13381336
MetadataVersion.latestTesting(),
13391337
Collections.singletonMap(TransactionVersion.FEATURE_NAME, transactionVersion.featureLevel()),
1340-
0,
1341-
true
1342-
)
1338+
0)
13431339
}
13441340
val transactionManager = new TransactionStateManager(0, scheduler,
13451341
replicaManager, metadataCache, txnConfig, time, metrics)

core/src/test/scala/unit/kafka/network/ProcessorTest.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class ProcessorTest {
3636
val requestHeader = RequestTestUtils.serializeRequestHeader(
3737
new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
3838
val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true,
39-
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
39+
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0))
4040
assertThrows(classOf[InvalidRequestException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable,
4141
"INIT_PRODUCER_ID with listener type CONTROLLER should throw InvalidRequestException exception")
4242
}
@@ -46,7 +46,7 @@ class ProcessorTest {
4646
val requestHeader = RequestTestUtils.serializeRequestHeader(
4747
new RequestHeader(ApiKeys.PRODUCE, 0, "clientid", 0))
4848
val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true,
49-
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
49+
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0))
5050
assertThrows(classOf[UnsupportedVersionException], (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): Executable,
5151
"PRODUCE v0 should throw UnsupportedVersionException exception")
5252
}

core/src/test/scala/unit/kafka/network/SocketServerTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class SocketServerTest {
8484
TestUtils.clearYammerMetrics()
8585

8686
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true,
87-
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
87+
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0))
8888
var server: SocketServer = _
8989
val sockets = new ArrayBuffer[Socket]
9090

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ class KafkaApisTest extends Logging {
177177
enabledApis,
178178
BrokerFeatures.defaultSupportedFeatures(true),
179179
true,
180-
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0, true))
180+
() => new FinalizedFeatures(MetadataVersion.latestTesting(), Collections.emptyMap[String, java.lang.Short], 0))
181181

182182
when(groupCoordinator.isNewGroupCoordinator).thenReturn(config.isNewGroupCoordinatorEnabled)
183183
setupFeatures(featureVersions)
@@ -218,9 +218,7 @@ class KafkaApisTest extends Logging {
218218
featureVersions.map { featureVersion =>
219219
featureVersion.featureName -> featureVersion.featureLevel.asInstanceOf[java.lang.Short]
220220
}.toMap.asJava,
221-
0,
222-
true
223-
)
221+
0)
224222
}
225223

226224
case _ => throw new IllegalStateException("Test must set an instance of KRaftMetadataCache")

metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ public void onMetadataUpdate(
5757
if (delta.featuresDelta() != null) {
5858
FinalizedFeatures newFinalizedFeatures = new FinalizedFeatures(newImage.features().metadataVersion(),
5959
newImage.features().finalizedVersions(),
60-
newImage.provenance().lastContainedOffset(),
61-
true);
60+
newImage.provenance().lastContainedOffset()
61+
);
6262
if (!newFinalizedFeatures.equals(finalizedFeatures)) {
6363
log.info("Loaded new metadata {}.", newFinalizedFeatures);
6464
finalizedFeatures = newFinalizedFeatures;

server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java

+4-10
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,19 @@ public final class FinalizedFeatures {
2727
private final long finalizedFeaturesEpoch;
2828

2929
public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
30-
return new FinalizedFeatures(version, Collections.emptyMap(), -1, true);
30+
return new FinalizedFeatures(version, Collections.emptyMap(), -1);
3131
}
3232

3333
public FinalizedFeatures(
3434
MetadataVersion metadataVersion,
3535
Map<String, Short> finalizedFeatures,
36-
long finalizedFeaturesEpoch,
37-
boolean kraftMode
36+
long finalizedFeaturesEpoch
3837
) {
38+
Objects.requireNonNull(metadataVersion);
3939
this.metadataVersion = metadataVersion;
4040
this.finalizedFeatures = new HashMap<>(finalizedFeatures);
4141
this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
42-
// In KRaft mode, we always include the metadata version in the features map.
43-
// In ZK mode, we never include it.
44-
if (kraftMode) {
45-
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
46-
} else {
47-
this.finalizedFeatures.remove(MetadataVersion.FEATURE_NAME);
48-
}
42+
this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
4943
}
5044

5145
public MetadataVersion metadataVersion() {

server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,16 @@
2424
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
2525
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
2626
import static org.junit.jupiter.api.Assertions.assertEquals;
27-
import static org.junit.jupiter.api.Assertions.assertNull;
2827

2928
class FinalizedFeaturesTest {
3029
@Test
3130
public void testKRaftModeFeatures() {
3231
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_KRAFT_VERSION,
33-
Collections.singletonMap("foo", (short) 2), 123, true);
32+
Collections.singletonMap("foo", (short) 2), 123);
3433
assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
3534
finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
3635
assertEquals((short) 2,
3736
finalizedFeatures.finalizedFeatures().get("foo"));
3837
assertEquals(2, finalizedFeatures.finalizedFeatures().size());
3938
}
40-
41-
@Test
42-
public void testZkModeFeatures() {
43-
FinalizedFeatures finalizedFeatures = new FinalizedFeatures(MINIMUM_KRAFT_VERSION,
44-
Collections.singletonMap("foo", (short) 2), 123, false);
45-
assertNull(finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
46-
assertEquals((short) 2,
47-
finalizedFeatures.finalizedFeatures().get("foo"));
48-
assertEquals(1, finalizedFeatures.finalizedFeatures().size());
49-
}
5039
}

0 commit comments

Comments
 (0)