From 218215742ec528455736d16895a01166ad29f38b Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Thu, 2 May 2024 10:32:44 +0200 Subject: [PATCH 1/5] PoC for supporting unique kafka producer client ids --- .../connector/kafka/sink/ClientIdFactory.java | 14 ++++++ .../sink/FlinkKafkaInternalProducer.java | 33 +++++++++---- .../flink/connector/kafka/sink/KafkaSink.java | 6 +++ .../kafka/sink/KafkaSinkBuilder.java | 15 ++++++ .../connector/kafka/sink/KafkaWriter.java | 46 +++++++++++-------- 5 files changed, 86 insertions(+), 28 deletions(-) create mode 100644 flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java new file mode 100644 index 000000000..66cf9efde --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java @@ -0,0 +1,14 @@ +package org.apache.flink.connector.kafka.sink; + +public class ClientIdFactory { + private static final String CLIENT_ID_DELIMITER = "-"; + + public static String buildClientId( + String clientIdPrefix, + int subtaskId + ) { + return clientIdPrefix + + CLIENT_ID_DELIMITER + + subtaskId; + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java index e514054d7..283d0fff1 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java @@ -51,13 +51,17 @@ class FlinkKafkaInternalProducer extends KafkaProducer { "org.apache.kafka.clients.producer.internals.TransactionManager$State"; private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch"; - @Nullable private String transactionalId; + @Nullable + private String transactionalId; private volatile boolean inTransaction; private volatile boolean hasRecordsInTransaction; private volatile boolean closed; - public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) { - super(withTransactionalId(properties, transactionalId)); + public FlinkKafkaInternalProducer(Properties properties, + @Nullable String transactionalId, + @Nullable String clientId + ) { + super(withClientId(withTransactionalId(properties, transactionalId), "")); this.transactionalId = transactionalId; } @@ -72,6 +76,17 @@ private static Properties withTransactionalId( return props; } + private static Properties withClientId(Properties properties, @Nullable String clientId) { + if (clientId == null) { + return properties; + } + + Properties props = new Properties(); + props.putAll(properties); + props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId); + return props; + } + @Override public Future send(ProducerRecord record, Callback callback) { if (inTransaction) { @@ -223,8 +238,8 @@ private TransactionalRequestResult enqueueNewPartitions() { invoke( transactionManager, "enqueueRequest", - new Class[] {txnRequestHandler.getClass().getSuperclass()}, - new Object[] {txnRequestHandler}); + new Class[]{txnRequestHandler.getClass().getSuperclass()}, + new Object[]{txnRequestHandler}); result = (TransactionalRequestResult) getField( @@ -344,10 +359,10 @@ private static Object createProducerIdAndEpoch(long producerId, short epoch) { constructor.setAccessible(true); return constructor.newInstance(producerId, epoch); } catch (InvocationTargetException - | InstantiationException - | IllegalAccessException - | NoSuchFieldException - | NoSuchMethodException e) { + | InstantiationException + | IllegalAccessException + | NoSuchFieldException + | NoSuchMethodException e) { throw new RuntimeException("Incompatible KafkaProducer version", e); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index d5b1c3700..669b2b482 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -61,15 +61,19 @@ public class KafkaSink private final KafkaRecordSerializationSchema recordSerializer; private final Properties kafkaProducerConfig; private final String transactionalIdPrefix; + private final String clientIdPrefix; + KafkaSink( DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, String transactionalIdPrefix, + String clientIdPrefix, KafkaRecordSerializationSchema recordSerializer) { this.deliveryGuarantee = deliveryGuarantee; this.kafkaProducerConfig = kafkaProducerConfig; this.transactionalIdPrefix = transactionalIdPrefix; + this.clientIdPrefix = clientIdPrefix; this.recordSerializer = recordSerializer; } @@ -102,6 +106,7 @@ public KafkaWriter createWriter(InitContext context) throws IOException { deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, + clientIdPrefix, context, recordSerializer, context.asSerializationSchemaInitializationContext(), @@ -116,6 +121,7 @@ public KafkaWriter restoreWriter( deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, + clientIdPrefix, context, recordSerializer, context.asSerializationSchemaInitializationContext(), diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java index f0c20cfc0..3a17ac301 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java @@ -71,6 +71,7 @@ public class KafkaSinkBuilder { private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE; private String transactionalIdPrefix = "kafka-sink"; + private String clientIdPrefix = null; private final Properties kafkaProducerConfig; private KafkaRecordSerializationSchema recordSerializer; @@ -190,6 +191,20 @@ public KafkaSinkBuilder setBootstrapServers(String bootstrapServers) { return setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); } + /** + * Set the prefix for all KafkaProducer `client.id` values. + * This will overwrite any value set for `client.id` in the provided Kafka producer configuration. + * Instead, a value for `client.id` will be derived from the prefix provided. + * Using a prefix will create a unique Kafka `client.id` for all producers. + * + * @param clientIdPrefix Prefix to use + * @return {@link KafkaSinkBuilder} + */ + public KafkaSinkBuilder setClientIdPrefix(String clientIdPrefix) { + this.clientIdPrefix = checkNotNull(clientIdPrefix, "clientIdPrefix"); + return this; + } + private void sanityCheck() { checkNotNull( kafkaProducerConfig.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 0cc16b219..6efc792bf 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -68,7 +68,7 @@ */ class KafkaWriter implements TwoPhaseCommittingStatefulSink.PrecommittingStatefulSinkWriter< - IN, KafkaWriterState, KafkaCommittable> { + IN, KafkaWriterState, KafkaCommittable> { private static final Logger LOG = LoggerFactory.getLogger(KafkaWriter.class); private static final String KAFKA_PRODUCER_METRIC_NAME = "KafkaProducer"; @@ -81,6 +81,7 @@ class KafkaWriter private final DeliveryGuarantee deliveryGuarantee; private final Properties kafkaProducerConfig; private final String transactionalIdPrefix; + private final String clientIdPrefix; private final KafkaRecordSerializationSchema recordSerializer; private final Callback deliveryCallback; private final KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext; @@ -115,22 +116,24 @@ class KafkaWriter * KafkaRecordSerializationSchema#open(SerializationSchema.InitializationContext, * KafkaRecordSerializationSchema.KafkaSinkContext)} fails. * - * @param deliveryGuarantee the Sink's delivery guarantee - * @param kafkaProducerConfig the properties to configure the {@link FlinkKafkaInternalProducer} + * @param deliveryGuarantee the Sink's delivery guarantee + * @param kafkaProducerConfig the properties to configure the {@link FlinkKafkaInternalProducer} * @param transactionalIdPrefix used to create the transactionalIds - * @param sinkInitContext context to provide information about the runtime environment - * @param recordSerializer serialize to transform the incoming records to {@link ProducerRecord} - * @param schemaContext context used to initialize the {@link KafkaRecordSerializationSchema} - * @param recoveredStates state from an previous execution which was covered + * @param sinkInitContext context to provide information about the runtime environment + * @param recordSerializer serialize to transform the incoming records to {@link ProducerRecord} + * @param schemaContext context used to initialize the {@link KafkaRecordSerializationSchema} + * @param recoveredStates state from an previous execution which was covered */ KafkaWriter( DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, String transactionalIdPrefix, + String clientIdPrefix, Sink.InitContext sinkInitContext, KafkaRecordSerializationSchema recordSerializer, SerializationSchema.InitializationContext schemaContext, Collection recoveredStates) { + this.clientIdPrefix = clientIdPrefix; this.deliveryGuarantee = checkNotNull(deliveryGuarantee, "deliveryGuarantee"); this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, "kafkaProducerConfig"); this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix"); @@ -142,11 +145,11 @@ class KafkaWriter sinkInitContext.metadataConsumer().orElse(null)); this.disabledMetrics = kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS) - && Boolean.parseBoolean( - kafkaProducerConfig.get(KEY_DISABLE_METRICS).toString()) + && Boolean.parseBoolean( + kafkaProducerConfig.get(KEY_DISABLE_METRICS).toString()) || kafkaProducerConfig.containsKey(KEY_REGISTER_METRICS) - && !Boolean.parseBoolean( - kafkaProducerConfig.get(KEY_REGISTER_METRICS).toString()); + && !Boolean.parseBoolean( + kafkaProducerConfig.get(KEY_REGISTER_METRICS).toString()); this.timeService = sinkInitContext.getProcessingTimeService(); this.metricGroup = sinkInitContext.metricGroup(); this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); @@ -175,7 +178,9 @@ class KafkaWriter this.currentProducer.beginTransaction(); } else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE || deliveryGuarantee == DeliveryGuarantee.NONE) { - this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null); + String clientId = + ClientIdFactory.buildClientId(clientIdPrefix, kafkaSinkContext.getParallelInstanceId()); + this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null, clientId); producerCloseables.add(this.currentProducer); initKafkaMetrics(this.currentProducer); } else { @@ -290,11 +295,11 @@ void abortLingeringTransactions( } try (TransactionAborter transactionAborter = - new TransactionAborter( - kafkaSinkContext.getParallelInstanceId(), - kafkaSinkContext.getNumberOfParallelInstances(), - this::getOrCreateTransactionalProducer, - producerPool::add)) { + new TransactionAborter( + kafkaSinkContext.getParallelInstanceId(), + kafkaSinkContext.getNumberOfParallelInstances(), + this::getOrCreateTransactionalProducer, + producerPool::add)) { transactionAborter.abortLingeringTransactions(prefixesToAbort, startCheckpointId); } } @@ -332,7 +337,9 @@ private FlinkKafkaInternalProducer getOrCreateTransactionalProdu String transactionalId) { FlinkKafkaInternalProducer producer = producerPool.poll(); if (producer == null) { - producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId); + String clientId = + ClientIdFactory.buildClientId(clientIdPrefix, kafkaSinkContext.getParallelInstanceId()); + producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId, clientId); producerCloseables.add(producer); producer.initTransactions(); initKafkaMetrics(producer); @@ -424,7 +431,8 @@ private void checkAsyncException() throws IOException { private class WriterCallback implements Callback { private final MailboxExecutor mailboxExecutor; - @Nullable private final Consumer metadataConsumer; + @Nullable + private final Consumer metadataConsumer; public WriterCallback( MailboxExecutor mailboxExecutor, From 07eff603e910d52804bf80c5d0ed13d44d0c2817 Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Thu, 2 May 2024 10:53:39 +0200 Subject: [PATCH 2/5] Added prefix to builder --- .../org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java index 3a17ac301..9683a7cc8 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java @@ -225,6 +225,6 @@ private void sanityCheck() { public KafkaSink build() { sanityCheck(); return new KafkaSink<>( - deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, recordSerializer); + deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, transactionalIdPrefix, recordSerializer); } } From 62052cd4a24057b3fa720f300d1ded1ef1265ded Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Thu, 2 May 2024 12:47:56 +0200 Subject: [PATCH 3/5] providing client id --- .../sink/FlinkKafkaInternalProducer.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java index 283d0fff1..1c48cf308 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java @@ -61,29 +61,30 @@ public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId, @Nullable String clientId ) { - super(withClientId(withTransactionalId(properties, transactionalId), "")); + super(withTransactionAndClientIds(properties, transactionalId, clientId)); this.transactionalId = transactionalId; } - private static Properties withTransactionalId( - Properties properties, @Nullable String transactionalId) { - if (transactionalId == null) { + private static Properties withTransactionAndClientIds( + Properties properties, + @Nullable String transactionalId, + @Nullable String clientId + ) { + if(transactionalId == null && clientId == null) { return properties; } + Properties props = new Properties(); props.putAll(properties); - props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); - return props; - } - private static Properties withClientId(Properties properties, @Nullable String clientId) { - if (clientId == null) { - return properties; + if(transactionalId != null) { + props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + } + + if(clientId != null) { + props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId); } - Properties props = new Properties(); - props.putAll(properties); - props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId); return props; } From 845ab6657d574faec75595050f90250a326dda4f Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Thu, 2 May 2024 13:20:34 +0200 Subject: [PATCH 4/5] Refactor to avoid changing state store --- .../sink/FlinkKafkaInternalProducer.java | 42 ++++-------- .../kafka/sink/KafkaSinkBuilder.java | 2 +- .../connector/kafka/sink/KafkaWriter.java | 66 +++++++++++-------- 3 files changed, 54 insertions(+), 56 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java index 1c48cf308..e514054d7 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java @@ -51,40 +51,24 @@ class FlinkKafkaInternalProducer extends KafkaProducer { "org.apache.kafka.clients.producer.internals.TransactionManager$State"; private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch"; - @Nullable - private String transactionalId; + @Nullable private String transactionalId; private volatile boolean inTransaction; private volatile boolean hasRecordsInTransaction; private volatile boolean closed; - public FlinkKafkaInternalProducer(Properties properties, - @Nullable String transactionalId, - @Nullable String clientId - ) { - super(withTransactionAndClientIds(properties, transactionalId, clientId)); + public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) { + super(withTransactionalId(properties, transactionalId)); this.transactionalId = transactionalId; } - private static Properties withTransactionAndClientIds( - Properties properties, - @Nullable String transactionalId, - @Nullable String clientId - ) { - if(transactionalId == null && clientId == null) { + private static Properties withTransactionalId( + Properties properties, @Nullable String transactionalId) { + if (transactionalId == null) { return properties; } - Properties props = new Properties(); props.putAll(properties); - - if(transactionalId != null) { - props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); - } - - if(clientId != null) { - props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, clientId); - } - + props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); return props; } @@ -239,8 +223,8 @@ private TransactionalRequestResult enqueueNewPartitions() { invoke( transactionManager, "enqueueRequest", - new Class[]{txnRequestHandler.getClass().getSuperclass()}, - new Object[]{txnRequestHandler}); + new Class[] {txnRequestHandler.getClass().getSuperclass()}, + new Object[] {txnRequestHandler}); result = (TransactionalRequestResult) getField( @@ -360,10 +344,10 @@ private static Object createProducerIdAndEpoch(long producerId, short epoch) { constructor.setAccessible(true); return constructor.newInstance(producerId, epoch); } catch (InvocationTargetException - | InstantiationException - | IllegalAccessException - | NoSuchFieldException - | NoSuchMethodException e) { + | InstantiationException + | IllegalAccessException + | NoSuchFieldException + | NoSuchMethodException e) { throw new RuntimeException("Incompatible KafkaProducer version", e); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java index 9683a7cc8..0dd8be86c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java @@ -225,6 +225,6 @@ private void sanityCheck() { public KafkaSink build() { sanityCheck(); return new KafkaSink<>( - deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, transactionalIdPrefix, recordSerializer); + deliveryGuarantee, kafkaProducerConfig, transactionalIdPrefix, clientIdPrefix, recordSerializer); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 6efc792bf..6c03978b1 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -32,6 +32,7 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; @@ -68,7 +69,7 @@ */ class KafkaWriter implements TwoPhaseCommittingStatefulSink.PrecommittingStatefulSinkWriter< - IN, KafkaWriterState, KafkaCommittable> { + IN, KafkaWriterState, KafkaCommittable> { private static final Logger LOG = LoggerFactory.getLogger(KafkaWriter.class); private static final String KAFKA_PRODUCER_METRIC_NAME = "KafkaProducer"; @@ -81,7 +82,6 @@ class KafkaWriter private final DeliveryGuarantee deliveryGuarantee; private final Properties kafkaProducerConfig; private final String transactionalIdPrefix; - private final String clientIdPrefix; private final KafkaRecordSerializationSchema recordSerializer; private final Callback deliveryCallback; private final KafkaRecordSerializationSchema.KafkaSinkContext kafkaSinkContext; @@ -116,13 +116,13 @@ class KafkaWriter * KafkaRecordSerializationSchema#open(SerializationSchema.InitializationContext, * KafkaRecordSerializationSchema.KafkaSinkContext)} fails. * - * @param deliveryGuarantee the Sink's delivery guarantee - * @param kafkaProducerConfig the properties to configure the {@link FlinkKafkaInternalProducer} + * @param deliveryGuarantee the Sink's delivery guarantee + * @param kafkaProducerConfig the properties to configure the {@link FlinkKafkaInternalProducer} * @param transactionalIdPrefix used to create the transactionalIds - * @param sinkInitContext context to provide information about the runtime environment - * @param recordSerializer serialize to transform the incoming records to {@link ProducerRecord} - * @param schemaContext context used to initialize the {@link KafkaRecordSerializationSchema} - * @param recoveredStates state from an previous execution which was covered + * @param sinkInitContext context to provide information about the runtime environment + * @param recordSerializer serialize to transform the incoming records to {@link ProducerRecord} + * @param schemaContext context used to initialize the {@link KafkaRecordSerializationSchema} + * @param recoveredStates state from an previous execution which was covered */ KafkaWriter( DeliveryGuarantee deliveryGuarantee, @@ -133,23 +133,23 @@ class KafkaWriter KafkaRecordSerializationSchema recordSerializer, SerializationSchema.InitializationContext schemaContext, Collection recoveredStates) { - this.clientIdPrefix = clientIdPrefix; this.deliveryGuarantee = checkNotNull(deliveryGuarantee, "deliveryGuarantee"); this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig, "kafkaProducerConfig"); this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix, "transactionalIdPrefix"); this.recordSerializer = checkNotNull(recordSerializer, "recordSerializer"); checkNotNull(sinkInitContext, "sinkInitContext"); + overwriteClientId(kafkaProducerConfig, clientIdPrefix, sinkInitContext.getSubtaskId()); this.deliveryCallback = new WriterCallback( sinkInitContext.getMailboxExecutor(), sinkInitContext.metadataConsumer().orElse(null)); this.disabledMetrics = kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS) - && Boolean.parseBoolean( - kafkaProducerConfig.get(KEY_DISABLE_METRICS).toString()) + && Boolean.parseBoolean( + kafkaProducerConfig.get(KEY_DISABLE_METRICS).toString()) || kafkaProducerConfig.containsKey(KEY_REGISTER_METRICS) - && !Boolean.parseBoolean( - kafkaProducerConfig.get(KEY_REGISTER_METRICS).toString()); + && !Boolean.parseBoolean( + kafkaProducerConfig.get(KEY_REGISTER_METRICS).toString()); this.timeService = sinkInitContext.getProcessingTimeService(); this.metricGroup = sinkInitContext.metricGroup(); this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); @@ -178,9 +178,7 @@ class KafkaWriter this.currentProducer.beginTransaction(); } else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE || deliveryGuarantee == DeliveryGuarantee.NONE) { - String clientId = - ClientIdFactory.buildClientId(clientIdPrefix, kafkaSinkContext.getParallelInstanceId()); - this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null, clientId); + this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null); producerCloseables.add(this.currentProducer); initKafkaMetrics(this.currentProducer); } else { @@ -295,15 +293,34 @@ void abortLingeringTransactions( } try (TransactionAborter transactionAborter = - new TransactionAborter( - kafkaSinkContext.getParallelInstanceId(), - kafkaSinkContext.getNumberOfParallelInstances(), - this::getOrCreateTransactionalProducer, - producerPool::add)) { + new TransactionAborter( + kafkaSinkContext.getParallelInstanceId(), + kafkaSinkContext.getNumberOfParallelInstances(), + this::getOrCreateTransactionalProducer, + producerPool::add)) { transactionAborter.abortLingeringTransactions(prefixesToAbort, startCheckpointId); } } + private static void overwriteClientId(Properties properties, String clientIdPrefix, int subtaskId) { + if(clientIdPrefix == null) { + return; + } + String updatedClientId = ClientIdFactory.buildClientId(clientIdPrefix, subtaskId); + overrideProperty(properties, ProducerConfig.CLIENT_ID_CONFIG, updatedClientId); + } + + private static void overrideProperty(Properties properties, String key, String value) { + String userValue = properties.getProperty(key); + if(userValue != null) { + LOG.warn( + String.format( + "Property %s is provided but will be overridden from %s to %s", + key, userValue, value)); + } + properties.setProperty(key, value); + } + /** * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new transactions * will not clash with transactions created during previous checkpoints ({@code @@ -337,9 +354,7 @@ private FlinkKafkaInternalProducer getOrCreateTransactionalProdu String transactionalId) { FlinkKafkaInternalProducer producer = producerPool.poll(); if (producer == null) { - String clientId = - ClientIdFactory.buildClientId(clientIdPrefix, kafkaSinkContext.getParallelInstanceId()); - producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId, clientId); + producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId); producerCloseables.add(producer); producer.initTransactions(); initKafkaMetrics(producer); @@ -431,8 +446,7 @@ private void checkAsyncException() throws IOException { private class WriterCallback implements Callback { private final MailboxExecutor mailboxExecutor; - @Nullable - private final Consumer metadataConsumer; + @Nullable private final Consumer metadataConsumer; public WriterCallback( MailboxExecutor mailboxExecutor, From 700660f70bee3438fef27b5c131c784e9a1cbef7 Mon Sep 17 00:00:00 2001 From: Francis Altomare Date: Thu, 2 May 2024 14:04:56 +0200 Subject: [PATCH 5/5] Fix style errors --- .../connector/kafka/sink/ClientIdFactory.java | 29 +++++++++++++++++++ .../flink/connector/kafka/sink/KafkaSink.java | 1 - .../kafka/sink/KafkaSinkBuilder.java | 19 ++++++++++++ .../connector/kafka/sink/KafkaWriter.java | 4 +-- 4 files changed, 50 insertions(+), 3 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java index 66cf9efde..e1fa3defe 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ClientIdFactory.java @@ -1,8 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.connector.kafka.sink; +/** + * Construct a Kafka ClientId using a prefix + * and a subtask id. + */ public class ClientIdFactory { private static final String CLIENT_ID_DELIMITER = "-"; + /** + * Construct a Kafka client id in the following format + * {@code clientIdPrefix-subtaskId}. + * + * @param clientIdPrefix prefix for the id + * @param subtaskId id of the Kafka producer subtask + * @return clientId + */ public static String buildClientId( String clientIdPrefix, int subtaskId diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java index 669b2b482..d7efcb96b 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSink.java @@ -63,7 +63,6 @@ public class KafkaSink private final String transactionalIdPrefix; private final String clientIdPrefix; - KafkaSink( DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java index 0dd8be86c..59afd3bf0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java @@ -217,6 +217,25 @@ private void sanityCheck() { checkNotNull(recordSerializer, "recordSerializer"); } + private static void overwriteConfig(Properties properties, String clientIdPrefix, int subtaskId) { + if (clientIdPrefix == null) { + return; + } + String updatedClientId = ClientIdFactory.buildClientId(clientIdPrefix, subtaskId); + overrideProperty(properties, ProducerConfig.CLIENT_ID_CONFIG, updatedClientId); + } + + private static void overrideProperty(Properties properties, String key, String value) { + String userValue = properties.getProperty(key); + if (userValue != null) { + LOG.warn( + String.format( + "Property %s is provided but will be overridden from %s to %s", + key, userValue, value)); + } + properties.setProperty(key, value); + } + /** * Constructs the {@link KafkaSink} with the configured properties. * diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index 6c03978b1..d628d437a 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -303,7 +303,7 @@ void abortLingeringTransactions( } private static void overwriteClientId(Properties properties, String clientIdPrefix, int subtaskId) { - if(clientIdPrefix == null) { + if (clientIdPrefix == null) { return; } String updatedClientId = ClientIdFactory.buildClientId(clientIdPrefix, subtaskId); @@ -312,7 +312,7 @@ private static void overwriteClientId(Properties properties, String clientIdPref private static void overrideProperty(Properties properties, String key, String value) { String userValue = properties.getProperty(key); - if(userValue != null) { + if (userValue != null) { LOG.warn( String.format( "Property %s is provided but will be overridden from %s to %s",