Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,7 @@ project(':test-common:test-common-runtime') {
implementation project(':raft')
implementation project(':storage')

implementation libs.bcpkix
implementation libs.junitPlatformLanucher
implementation libs.junitJupiter
implementation libs.jacksonDatabindYaml
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control-test-common-runtime.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@
<allow pkg="java" />
<allow pkg="scala.jdk.javaapi" />
<allow pkg="javax.security" />
<allow pkg="org.bouncycastle" />
<allow pkg="javax.net" />
</import-control>
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ default <K, V> Producer<K, V> producer(Map<String, Object> configs) {
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
return new KafkaProducer<>(setClientSaslConfig(props));
return new KafkaProducer<>(setClientSaslConfig(setClientSslConfig(props)));
}

default <K, V> Producer<K, V> producer() {
Expand All @@ -166,7 +166,7 @@ default <K, V> Consumer<K, V> consumer(Map<String, Object> configs) {
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
return new KafkaConsumer<>(setClientSaslConfig(props));
return new KafkaConsumer<>(setClientSaslConfig(setClientSslConfig(props)));
}

default <K, V> Consumer<K, V> consumer() {
Expand All @@ -191,7 +191,7 @@ default <K, V> ShareConsumer<K, V> shareConsumer(Map<String, Object> configs, De
}
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + TestUtils.randomString(5));
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
return new KafkaShareConsumer<>(setClientSaslConfig(props), keyDeserializer, valueDeserializer);
return new KafkaShareConsumer<>(setClientSaslConfig(setClientSslConfig(props)), keyDeserializer, valueDeserializer);
}

default Admin admin(Map<String, Object> configs, boolean usingBootstrapControllers) {
Expand All @@ -203,7 +203,7 @@ default Admin admin(Map<String, Object> configs, boolean usingBootstrapControlle
props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
}
return Admin.create(setClientSaslConfig(props));
return Admin.create(setClientSaslConfig(setClientSslConfig(props)));
}

default Map<String, Object> setClientSaslConfig(Map<String, Object> configs) {
Expand All @@ -222,6 +222,8 @@ default Map<String, Object> setClientSaslConfig(Map<String, Object> configs) {
return props;
}

Map<String, Object> setClientSslConfig(Map<String, Object> configs);

default Admin admin(Map<String, Object> configs) {
return admin(configs, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,21 @@ public Builder setInitialVoterSet(Map<Integer, Uuid> initialVoterSet) {
return this;
}

private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
private KafkaConfig createNodeConfig(TestKitNode node, Map<String, Object> sslConfig) throws IOException {
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
TestKitNode controllerNode = nodes.controllerNodes().get(node.id());

Map<String, Object> props = new HashMap<>(configProps);
props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
Long.toString(TimeUnit.MINUTES.toMillis(10)));
props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, roles(node.id()));
props.put(KRaftConfigs.NODE_ID_CONFIG,
Integer.toString(node.id()));
props.put(KRaftConfigs.NODE_ID_CONFIG, Integer.toString(node.id()));

// In combined mode, always prefer the metadata log directory of the controller node.
if (controllerNode != null) {
props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
controllerNode.metadataDirectory());
setSecurityProtocolProps(props, controllerSecurityProtocol);
setSecurityProtocolProps(props, controllerSecurityProtocol, sslConfig);
} else {
props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
node.metadataDirectory());
Expand All @@ -167,7 +167,7 @@ private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
// Set the log.dirs according to the broker node setting (if there is a broker node)
props.put(LOG_DIRS_CONFIG,
String.join(",", brokerNode.logDataDirectories()));
setSecurityProtocolProps(props, brokerSecurityProtocol);
setSecurityProtocolProps(props, brokerSecurityProtocol, sslConfig);
} else {
// Set log.dirs equal to the metadata directory if there is just a controller.
props.put(LOG_DIRS_CONFIG,
Expand Down Expand Up @@ -216,19 +216,32 @@ private KafkaConfig createNodeConfig(TestKitNode node) throws IOException {
return new KafkaConfig(props, false);
}

private void setSecurityProtocolProps(Map<String, Object> props, String securityProtocol) {
private void setSecurityProtocolProps(
Map<String, Object> props,
String securityProtocol,
Map<String, Object> sslConfig
) {
if (securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN");
props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN");
props.putIfAbsent(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN");
props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, StandardAuthorizer.class.getName());
props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG, "User:" + JaasUtils.KAFKA_PLAIN_ADMIN);
} else if (securityProtocol.equals(SecurityProtocol.SASL_SSL.name)) {
props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN");
props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN");
props.putIfAbsent(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN");
props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, StandardAuthorizer.class.getName());
props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, "false");
props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG, "User:" + JaasUtils.KAFKA_PLAIN_ADMIN);
sslConfig.forEach(props::putIfAbsent);
}
}

private Optional<File> maybeSetupJaasFile() throws Exception {
if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name) ||
brokerSecurityProtocol.equals(SecurityProtocol.SASL_SSL.name)) {
File file = JaasUtils.writeJaasContextsToFile(Set.of(
new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
List.of(
Expand Down Expand Up @@ -260,6 +273,8 @@ public KafkaClusterTestKit build() throws Exception {
Map<Integer, SharedServer> jointServers = new HashMap<>();
File baseDirectory = null;
Optional<File> jaasFile = maybeSetupJaasFile();
SslManager sslManager = new SslManager();
Map<String, Object> sslConfig = sslManager.createSslConfig();
try {
baseDirectory = new File(nodes.baseDirectory());
for (TestKitNode node : nodes.controllerNodes().values()) {
Expand All @@ -270,7 +285,7 @@ public KafkaClusterTestKit build() throws Exception {
}
for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), List.of());
KafkaConfig config = createNodeConfig(node);
KafkaConfig config = createNodeConfig(node, sslConfig);
SharedServer sharedServer = new SharedServer(
config,
node.initialMetaPropertiesEnsemble(),
Expand Down Expand Up @@ -298,7 +313,7 @@ public KafkaClusterTestKit build() throws Exception {
for (TestKitNode node : nodes.brokerNodes().values()) {
SharedServer sharedServer = jointServers.get(node.id());
if (sharedServer == null) {
KafkaConfig config = createNodeConfig(node);
KafkaConfig config = createNodeConfig(node, sslConfig);
sharedServer = new SharedServer(
config,
node.initialMetaPropertiesEnsemble(),
Expand Down Expand Up @@ -342,6 +357,7 @@ public KafkaClusterTestKit build() throws Exception {
faultHandlerFactory,
socketFactoryManager,
jaasFile,
sslManager,
standalone,
initialVoterSet,
deleteOnClose);
Expand Down Expand Up @@ -389,6 +405,7 @@ private static void setupNodeDirectories(File baseDirectory,
private final PreboundSocketFactoryManager socketFactoryManager;
private final String controllerListenerName;
private final Optional<File> jaasFile;
private final SslManager sslManager;
private final boolean standalone;
private final Optional<Map<Integer, Uuid>> initialVoterSet;
private final boolean deleteOnClose;
Expand All @@ -401,6 +418,7 @@ private KafkaClusterTestKit(
SimpleFaultHandlerFactory faultHandlerFactory,
PreboundSocketFactoryManager socketFactoryManager,
Optional<File> jaasFile,
SslManager sslManager,
boolean standalone,
Optional<Map<Integer, Uuid>> initialVoterSet,
boolean deleteOnClose
Expand All @@ -420,6 +438,7 @@ private KafkaClusterTestKit(
this.socketFactoryManager = socketFactoryManager;
this.controllerListenerName = nodes.controllerListenerName().value();
this.jaasFile = jaasFile;
this.sslManager = sslManager;
this.standalone = standalone;
this.initialVoterSet = initialVoterSet;
this.deleteOnClose = deleteOnClose;
Expand Down Expand Up @@ -702,6 +721,10 @@ public MockFaultHandler nonFatalFaultHandler() {
return faultHandlerFactory.nonFatalFaultHandler();
}

public SslManager sslManager() {
return sslManager;
}

@Override
public void close() throws Exception {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
Expand Down Expand Up @@ -729,6 +752,7 @@ public void close() throws Exception {
if (jaasFile.isPresent()) {
Utils.delete(jaasFile.get());
}
sslManager.close();
}
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.test;

import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;

public class SslManager {

private static final Logger log = LoggerFactory.getLogger(SslManager.class);
private final File keyStoreFile;
private final File trustStoreFile;
public static final String CLUSTER_TRUSTSTORE_PASSWORD = "cluster-truststore-password";

public SslManager() {
try {
keyStoreFile = TestUtils.tempFile("kafka.cluster.keystore", ".jks");
trustStoreFile = TestUtils.tempFile("kafka.server.truststore", ".jks");
} catch (IOException e) {
throw new RuntimeException("Failed to create keystore or truststore file", e);
}
}

public Map<String, Object> createSslConfig() {
try {
KeyPair clusterKeyPair = TestSslUtils.generateKeyPair("RSA");
String[] hostNames = {"localhost", "127.0.0.1"};
X509Certificate clusterCert = TestSslUtils.generateSignedCertificate(
"CN=kafka-cluster, O=Kafka Test Cluster",
clusterKeyPair,
0,
365,
null,
null,
"SHA256withRSA",
false,
true,
true,
hostNames
);

Password keyStorePassword = new Password("cluster-keystore-password");
Password keyPassword = new Password("cluster-key-password");

TestSslUtils.createKeyStore(
keyStoreFile.getPath(),
keyStorePassword,
keyPassword,
"kafka-cluster",
clusterKeyPair.getPrivate(),
clusterCert
);

Password trustStorePassword = new Password(CLUSTER_TRUSTSTORE_PASSWORD);
TestSslUtils.createTrustStore(
trustStoreFile.getPath(),
trustStorePassword,
Map.of("kafka-cluster", clusterCert)
);
log.info("Created unified SSL config - KeyStore: {}, TrustStore: {}", keyStoreFile.getPath(), trustStoreFile.getPath());
return Map.ofEntries(
Map.entry(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath()),
Map.entry(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword),
Map.entry(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword),
Map.entry(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"),

Map.entry(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath()),
Map.entry(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword),
Map.entry(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"),

Map.entry(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"),
Map.entry(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, List.of("TLSv1.2")),

Map.entry(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, "SunX509"),
Map.entry(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, "PKIX"),
Map.entry(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "")
);
} catch (IOException | GeneralSecurityException e) {
throw new RuntimeException("Failed to create SSL config", e);
}
}

public String trustStoreLocation() {
return trustStoreFile != null ? trustStoreFile.getAbsolutePath() : null;
}

public void close() throws IOException {
if (keyStoreFile != null) {
Utils.delete(keyStoreFile);
}
if (trustStoreFile != null) {
Utils.delete(trustStoreFile);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ public TestKitNodes build() {
throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
}
// TODO: remove this assertion after https://issues.apache.org/jira/browse/KAFKA-16680 is finished
if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT && brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT) ||
(controllerSecurityProtocol != SecurityProtocol.PLAINTEXT && controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT)) {
throw new IllegalArgumentException("Currently only support PLAINTEXT / SASL_PLAINTEXT security protocol");
if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT && brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT && brokerSecurityProtocol != SecurityProtocol.SASL_SSL) ||
(controllerSecurityProtocol != SecurityProtocol.PLAINTEXT && controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT && brokerSecurityProtocol != SecurityProtocol.SASL_SSL)) {
throw new IllegalArgumentException("Currently only support PLAINTEXT / SASL_PLAINTEXT / SASL_SSL security protocol");
}
if (baseDirectory == null) {
this.baseDirectory = TestUtils.tempDirectory().toPath();
Expand Down
Loading