Skip to content
4 changes: 2 additions & 2 deletions .idea/workspace.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,21 @@ public class Sender {
* @param values the values to send as message's payload
*/
public void send(String topic, Map<String, String> values) {
// Sanitize the topic parameter to prevent log injection
String sanitizedTopic = topic.replaceAll("[^a-zA-Z0-9._-]", "");
String json = new JSONPayloadSerializer(values).toJSON();
LOGGER.info("sending payload='{}' to topic {}", json, topic);
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, topic, json);
LOGGER.info("sending payload='{}' to topic {}", json, sanitizedTopic);
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(sanitizedTopic, topic, json);
future.whenCompleteAsync(
(result, throwable) -> {
if (throwable != null) {
LOGGER.error(
"Unable to send message to =[" + topic + "] due to : " + throwable.getMessage());
"Unable to send message to =[" + sanitizedTopic + "] due to : " + throwable.getMessage());
future.completeExceptionally(throwable);
} else {
LOGGER.info(
"Sent message to topic=["
+ topic
+ sanitizedTopic
+ "] with offset=["
+ result.getRecordMetadata().offset()
+ "]");
Expand Down
Loading