Skip to content

Commit

Permalink
Fix deserialization of kafka producer json config in the kafka-report…
Browse files Browse the repository at this point in the history
…er-plugin. (#542)
  • Loading branch information
gzlicanyi authored May 31, 2023
1 parent c04a714 commit dc602b7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Release Notes.
* Fix possible IllegalStateException when using Micrometer.
* Support Grizzly Work ThreadPool Metric Monitor
* Fix the gson dependency in the kafka-reporter-plugin.
* Fix deserialization of kafka producer json config in the kafka-reporter-plugin.
* Support to config custom decode methods for kafka configurations

#### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.skywalking.apm.agent.core.kafka;

import com.google.gson.reflect.TypeToken;
import com.google.gson.Gson;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -34,7 +34,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -107,11 +106,7 @@ public void run() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS);

if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
Gson gson = new Gson();
Map<String, String> config = (Map<String, String>) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
decode(config).forEach(properties::setProperty);
}
setPropertiesFromJsonConfig(properties);
decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);

try (AdminClient adminClient = AdminClient.create(properties)) {
Expand All @@ -131,12 +126,12 @@ public void run() {
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());

if (!topics.isEmpty()) {
LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics);
return;
}

try {
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
} catch (Exception e) {
Expand All @@ -149,6 +144,15 @@ public void run() {
}
}

void setPropertiesFromJsonConfig(Properties properties) {
if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
Gson gson = new Gson();
Map<String, String> config = gson.fromJson(Kafka.PRODUCER_CONFIG_JSON,
new TypeToken<Map<String, String>>() { }.getType());
decode(config).forEach(properties::setProperty);
}
}

private void notifyListeners(KafkaConnectionStatus status) {
for (KafkaConnectionStatusListener listener : listeners) {
listener.onStatusChanged(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@

package org.apache.skywalking.apm.agent.core.kafka;

import org.junit.Test;

import static org.junit.Assert.assertEquals;
import java.lang.reflect.Method;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;

public class KafkaProducerManagerTest {
@Test
Expand All @@ -39,8 +38,8 @@ public void testAddListener() throws Exception {
kafkaProducerManager.addListener(new MockListener(counter));
}
Method notifyListeners = kafkaProducerManager
.getClass()
.getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class);
.getClass()
.getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class);
notifyListeners.setAccessible(true);
notifyListeners.invoke(kafkaProducerManager, KafkaConnectionStatus.CONNECTED);

Expand All @@ -60,6 +59,17 @@ public void testFormatTopicNameThenRegister() {
assertEquals(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS, value);
}

@Test
public void testSetPropertiesFromJsonConfig() {
KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
Properties properties = new Properties();

KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG_JSON = "{\"batch.size\":32768}";
kafkaProducerManager.setPropertiesFromJsonConfig(properties);

assertEquals(properties.get("batch.size"), "32768");
}

@Test
public void testDecode() throws Exception {
KafkaReporterPluginConfig.Plugin.Kafka.DECODE_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecodeTool";
Expand Down

0 comments on commit dc602b7

Please sign in to comment.