From 1f42e5d8351b106512d9fc08f9b948e14199e9ca Mon Sep 17 00:00:00 2001 From: unknown <92518@GSC41150.globalia.com> Date: Fri, 22 May 2020 17:07:48 +0200 Subject: [PATCH 1/3] Refactor to update new kafka API --- .classpath | 31 -- .dir-locals.el | 4 - .project | 23 - .travis.yml | 3 - README.md | 2 +- pom.xml | 66 ++- .../trans/kafka/consumer/KafkaConsumer.java | 306 ++++++------ .../kafka/consumer/KafkaConsumerCallable.java | 73 --- .../kafka/consumer/KafkaConsumerData.java | 25 +- .../kafka/consumer/KafkaConsumerMeta.java | 465 +++++++++--------- .../kafka/consumer/KafkaConsumerDialog.java | 55 ++- .../consumer/resources/kafka_consumer.png | Bin 1569 -> 1751 bytes .../kafka/consumer/KafkaConsumerDataTest.java | 6 +- .../kafka/consumer/KafkaConsumerMetaTest.java | 67 +-- .../kafka/consumer/KafkaConsumerTest.java | 179 +++---- 15 files changed, 612 insertions(+), 693 deletions(-) delete mode 100644 .classpath delete mode 100644 .dir-locals.el delete mode 100644 .project delete mode 100644 .travis.yml delete mode 100644 src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerCallable.java diff --git a/.classpath b/.classpath deleted file mode 100644 index 6a6c8e3..0000000 --- a/.classpath +++ /dev/null @@ -1,31 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/.dir-locals.el b/.dir-locals.el deleted file mode 100644 index 15aae5e..0000000 --- a/.dir-locals.el +++ /dev/null @@ -1,4 +0,0 @@ -((nil . ((indent-tabs-mode . t) - (tab-width . 4) - (c-basic-offset . 4) - (fill-column . 80)))) diff --git a/.project b/.project deleted file mode 100644 index 904397b..0000000 --- a/.project +++ /dev/null @@ -1,23 +0,0 @@ - - - pentaho-kafka-consumer - - - - - - org.eclipse.jdt.core.javabuilder - - - - - org.eclipse.m2e.core.maven2Builder - - - - - - org.eclipse.jdt.core.javanature - org.eclipse.m2e.core.maven2Nature - - diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 6a2ce34..0000000 --- a/.travis.yml +++ /dev/null @@ -1,3 +0,0 @@ -language: java -notifications: - email: false diff --git a/README.md b/README.md index b080c83..9fc1c34 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Apache Kafka consumer step plug-in for Pentaho Kettle. ### Apache Kafka Compatibility ### -The consumer depends on Apache Kafka 0.8.1.1, which means that the broker must be of 0.8.x version or later. +The consumer depends on Apache Kafka 1.1.0. Check broker version to ensure compatibility. If you want to build the plugin for a different Kafka version you have to modify the values of kafka.version and kafka.scala.version in the properties diff --git a/pom.xml b/pom.xml index 9403558..3559e31 100644 --- a/pom.xml +++ b/pom.xml @@ -4,34 +4,45 @@ 4.0.0 com.ruckuswireless pentaho-kafka-consumer - TRUNK-SNAPSHOT + 2.0.0-SNAPSHOT Apache Kafka Consumer Plug-In for Pentaho UTF-8 - 1.6 - 1.6 - 7.1.0.0-12 - 2.10 - 0.8.2.1 - ${maven.build.timestamp} + 1.8 + 1.8 + 9.0.0.0-423 + 2.12 + 1.1.0 4.12 1.6.6 + ${maven.build.timestamp} yyyyMMdd-HHmm pentaho-releases - http://nexus.pentaho.org/content/groups/omni + https://nexus.pentaho.org/repository/omni/ + + + maven-eclipse-repo + http://maven-eclipse.github.io/maven - - maven-eclipse-repo - http://maven-eclipse.github.io/maven - + + org.slf4j + slf4j-api + 1.7.25 + + + org.slf4j + slf4j-log4j12 + 1.7.25 + + org.apache.kafka kafka_${kafka.scala.version} @@ -51,6 +62,7 @@ + pentaho-kettle kettle-core @@ -69,10 +81,11 @@ ${kettle.version} provided + pentaho-kettle kettle-engine-test - ${kettle.version} + 7.1.0.32-246 test @@ -97,7 +110,9 @@ + ${project.artifactId} + src/main/resources @@ -116,17 +131,7 @@ - - org.pitest - pitest-maven - 1.2.3 - - - XML - HTML - - - + maven-dependency-plugin @@ -143,7 +148,6 @@ - org.apache.maven.plugins maven-assembly-plugin @@ -165,6 +169,18 @@ + + + org.pitest + pitest-maven + 1.2.3 + + + XML + HTML + + + org.sonarsource.scanner.maven sonar-maven-plugin diff --git a/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumer.java b/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumer.java index db46c6e..7350963 100644 --- a/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumer.java +++ b/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumer.java @@ -1,187 +1,209 @@ package org.pentaho.di.trans.kafka.consumer; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaStream; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.pentaho.di.core.exception.KettleException; +import org.pentaho.di.core.exception.KettleStepException; +import org.pentaho.di.core.exception.KettleValueException; import org.pentaho.di.core.row.RowDataUtil; import org.pentaho.di.core.row.RowMeta; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; -import org.pentaho.di.trans.step.*; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.pentaho.di.trans.step.BaseStep; +import org.pentaho.di.trans.step.StepDataInterface; +import org.pentaho.di.trans.step.StepInterface; +import org.pentaho.di.trans.step.StepMeta; +import org.pentaho.di.trans.step.StepMetaInterface; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; import java.util.Map.Entry; import java.util.Properties; -import java.util.concurrent.*; /** * Kafka Consumer step processor * * @author Michael Spector + * @author Miguel Ángel García */ -public class KafkaConsumer extends BaseStep implements StepInterface { - public static final String CONSUMER_TIMEOUT_KEY = "consumer.timeout.ms"; +public class KafkaConsumer extends BaseStep implements StepInterface +{ + private KafkaConsumerMeta meta; + private KafkaConsumerData data; + - public KafkaConsumer(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, - Trans trans) { + /** + * Default step constructor. + */ + public KafkaConsumer(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, Trans trans) + { super(stepMeta, stepDataInterface, copyNr, transMeta, trans); } - public boolean init(StepMetaInterface smi, StepDataInterface sdi) { - super.init(smi, sdi); - KafkaConsumerMeta meta = (KafkaConsumerMeta) smi; - KafkaConsumerData data = (KafkaConsumerData) sdi; - - Properties properties = meta.getKafkaProperties(); - Properties substProperties = new Properties(); - for (Entry e : properties.entrySet()) { - substProperties.put(e.getKey(), environmentSubstitute(e.getValue().toString())); + /** + * Finalize step execution. + * Close kafka consumer. + * + * @param smi StepMetaInterface + * @param sdi StepDataInterface + */ + @Override + public void dispose(StepMetaInterface smi, StepDataInterface sdi) + { + this.meta = (KafkaConsumerMeta) smi; + this.data = (KafkaConsumerData) sdi; + // Close consumer. + if (data.consumer != null) { + data.consumer.close(); } - if (meta.isStopOnEmptyTopic()) { + super.dispose(smi, sdi); + } - // If there isn't already a provided value, set a default of 1s - if (!substProperties.containsKey(CONSUMER_TIMEOUT_KEY)) { - substProperties.put(CONSUMER_TIMEOUT_KEY, "1000"); - } - } else { - if (substProperties.containsKey(CONSUMER_TIMEOUT_KEY)) { - logError(Messages.getString("KafkaConsumer.WarnConsumerTimeout")); + /** + * User stops execution. + * Close kafka consumer. + * + * @param smi StepMetaInterface + * @param sdi StepDataInterface + * @throws KettleException When stopRunning fails. + */ + @Override + public void stopRunning(StepMetaInterface smi, StepDataInterface sdi) throws KettleException + { + this.meta = (KafkaConsumerMeta) smi; + this.data = (KafkaConsumerData) sdi; + // Close kafka consumer. + this.data.consumer.close(); + // Stop step. + super.stopRunning(smi, sdi); + } + + /** + * Init step. + * Connect to Kafka and subscribe to a topic. + * + * @param smi Step Meta interface. + * @param sdi Step Data interface. + * @return True when init is Ok, false when KO. + */ + @Override + public boolean init(StepMetaInterface smi, StepDataInterface sdi) + { + // Basic step init. + super.init(smi, sdi); + this.meta = (KafkaConsumerMeta) smi; + this.data = (KafkaConsumerData) sdi; + + // Parse behaviours vars. + String topic = environmentSubstitute(this.meta.getTopic()); + // Parse config props. + Properties parsedProps = new Properties(); + for (Entry e : this.meta.getKafkaProperties().entrySet()) { + if (!"(default)".equals(e.getValue().toString())) { + parsedProps.put(e.getKey(), environmentSubstitute(e.getValue().toString())); } } - ConsumerConfig consumerConfig = new ConsumerConfig(substProperties); - logBasic(Messages.getString("KafkaConsumer.CreateKafkaConsumer.Message", consumerConfig.zkConnect())); - data.consumer = Consumer.createJavaConsumerConnector(consumerConfig); - Map topicCountMap = new HashMap(); - String topic = environmentSubstitute(meta.getTopic()); - topicCountMap.put(topic, 1); - Map>> streamsMap = data.consumer.createMessageStreams(topicCountMap); - logDebug("Received streams map: " + streamsMap); - data.streamIterator = streamsMap.get(topic).get(0).iterator(); + // Create kafka consumer and subscribe to a topic. + log.logDebug(parsedProps.toString()); - return true; - } + // Avoid ClassLoader error when loading StringDeserialized. + Thread currentThread = Thread.currentThread(); + ClassLoader savedClassLoader = currentThread.getContextClassLoader(); + currentThread.setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader()); + // Create Kafka Consumer. + this.data.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(parsedProps); + // Restore ClassLoader context. + currentThread.setContextClassLoader(savedClassLoader); - public void dispose(StepMetaInterface smi, StepDataInterface sdi) { - KafkaConsumerData data = (KafkaConsumerData) sdi; - if (data.consumer != null) { - data.consumer.shutdown(); + //TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener(); + //data.consumer.subscribe(Collections.singletonList(topic), rebalanceListener); + // Subscribe to a topic. + this.data.consumer.subscribe(Collections.singletonList(topic)); - } - super.dispose(smi, sdi); + // Init time and message counters. + this.data.totalMessages = 0; + this.data.startedOn = Instant.now(); + + // Init OK. + return true; } - public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { + /** + * Method that will be called in a loop to generate and consume rows. + * Kafka consumer does not expect any incoming row from previous steps. + * It is designed to execute processRow() exactly once, fetching data from the outside world, + * and putting them into the row stream by calling putRow() repeatedly until done. + */ + public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException + { + this.meta = (KafkaConsumerMeta) smi; + this.data = (KafkaConsumerData) sdi; + + // Get the input row. Null on our case. Object[] r = getRow(); - if (r == null) { - /* - * If we have no input rows, make sure we at least run once to - * produce output rows. This allows us to consume without requiring - * an input step. - */ - if (!first) { - setOutputDone(); - return false; - } - r = new Object[0]; - } else { - incrementLinesRead(); - } - final Object[] inputRow = r; + // Row structure only the first time. + if (this.first) { + this.first = false; + this.data.outputRowMeta = new RowMeta(); + this.meta.getFields(this.data.outputRowMeta, this.getStepname(), null, null, this, this.repository, this.metaStore); + // Convert integer and time metas. + this.data.maxMessages = Integer.parseInt(environmentSubstitute(this.meta.getLimit())); + this.data.maxTime = Long.parseLong(environmentSubstitute(this.meta.getTimeout())); + log.logDebug("Max messages set to " + this.data.maxMessages + " and Time Out set to " + this.data.maxTime + " ms."); + } - KafkaConsumerMeta meta = (KafkaConsumerMeta) smi; - final KafkaConsumerData data = (KafkaConsumerData) sdi; + // Poll messages from topic. + logDebug("Starting topic polling..."); + ConsumerRecords messages = this.data.consumer.poll(10000); - if (first) { - first = false; - data.inputRowMeta = getInputRowMeta(); - // No input rows means we just dummy data - if (data.inputRowMeta == null) { - data.outputRowMeta = new RowMeta(); - data.inputRowMeta = new RowMeta(); - } else { - data.outputRowMeta = getInputRowMeta().clone(); - } - meta.getFields(data.outputRowMeta, getStepname(), null, null, this, null, null); + // If no messages and stop when empty is set, we have finished. + if (this.meta.isStopOnEmptyTopic() && messages.count() == 0) { + logDebug("No more messages to read."); + setOutputDone(); + return false; } - try { - long timeout; - String strData = meta.getTimeout(); - - timeout = getTimeout(strData); - - logDebug("Starting message consumption with overall timeout of " + timeout + "ms"); - - KafkaConsumerCallable kafkaConsumer = new KafkaConsumerCallable(meta, data, this) { - protected void messageReceived(byte[] key, byte[] message) throws KettleException { - Object[] newRow = RowDataUtil.addRowData(inputRow.clone(), data.inputRowMeta.size(), - new Object[]{message, key}); - putRow(data.outputRowMeta, newRow); - - if (isRowLevel()) { - logRowlevel(Messages.getString("KafkaConsumer.Log.OutputRow", - Long.toString(getLinesWritten()), data.outputRowMeta.getString(newRow))); - } - } - }; - if (timeout > 0) { - logDebug("Starting timed consumption"); - ExecutorService executor = Executors.newSingleThreadExecutor(); - try { - Future future = executor.submit(kafkaConsumer); - executeFuture(timeout, future); - } finally { - executor.shutdown(); - } - } else { - logDebug("Starting direct consumption"); - kafkaConsumer.call(); + // Output the messages. + log.logDebug("Poll " + messages.count() + " messages from topic."); + messages.forEach(msg -> { + // Create new row. + Object[] outputRow = RowDataUtil.allocateRowData(this.data.outputRowMeta.size()); + outputRow[0] = msg.key(); + outputRow[1] = msg.value(); + // Output the row. + try { + this.putRow(this.data.outputRowMeta, this.data.outputRowMeta.cloneRow(outputRow)); + log.logRowlevel(Messages.getString("KafkaConsumer.Log.OutputRow", Long.toString(getLinesWritten()), this.data.outputRowMeta.getString(outputRow))); + } catch (KettleStepException | KettleValueException e) { + e.printStackTrace(); } - } catch (KettleException e) { - if (!getStepMeta().isDoingErrorHandling()) { - logError(Messages.getString("KafkaConsumer.ErrorInStepRunning", e.getMessage())); - setErrors(1); - stopAll(); - setOutputDone(); - return false; - } - putError(getInputRowMeta(), r, 1, e.toString(), null, getStepname()); - } - return true; - } + }); - private void executeFuture(long timeout, Future future) throws KettleException { - try { - future.get(timeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - logDebug("Timeout exception on the Future"); - } catch (Exception e) { - throw new KettleException(e); - } - } + // Update read messages. + this.data.totalMessages += messages.count(); - private long getTimeout(String strData) throws KettleException { - long timeout; - try { - timeout = KafkaConsumerMeta.isEmpty(strData) ? 0 : Long.parseLong(environmentSubstitute(strData)); - } catch (NumberFormatException e) { - throw new KettleException("Unable to parse step timeout value", e); + // Check total messages read. + if ((this.data.maxMessages > 0) && (this.data.totalMessages >= this.data.maxMessages)) { + logDebug("Reached maximum messages to read (" + this.data.totalMessages + ")."); + setOutputDone(); + return false; } - return timeout; - } - public void stopRunning(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { + // Check time reading. + if ((this.data.maxTime > 0) && (ChronoUnit.MILLIS.between(this.data.startedOn, Instant.now()) > this.data.maxTime)) { + logDebug("Reached timeout to read (" + this.data.maxTime + ")."); + setOutputDone(); + return false; + } - KafkaConsumerData data = (KafkaConsumerData) sdi; - data.consumer.shutdown(); - data.canceled = true; + if (checkFeedback(getLinesRead())) { + logBasic("KafkaConsumer.Log.LineNumber - Lines read " + getLinesRead()); + } - super.stopRunning(smi, sdi); + return true; } + } diff --git a/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerCallable.java b/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerCallable.java deleted file mode 100644 index 8410283..0000000 --- a/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerCallable.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.pentaho.di.trans.kafka.consumer; - -import kafka.consumer.ConsumerTimeoutException; -import kafka.message.MessageAndMetadata; -import org.pentaho.di.core.exception.KettleException; - -import java.util.concurrent.Callable; - -/** - * Kafka reader callable - * - * @author Michael Spector - */ -public abstract class KafkaConsumerCallable implements Callable { - - private KafkaConsumerData data; - private KafkaConsumerMeta meta; - private KafkaConsumer step; - - public KafkaConsumerCallable(KafkaConsumerMeta meta, KafkaConsumerData data, KafkaConsumer step) { - this.meta = meta; - this.data = data; - this.step = step; - } - - /** - * Called when new message arrives from Kafka stream - * - * @param message Kafka message - * @param key Kafka key - */ - protected abstract void messageReceived(byte[] key, byte[] message) throws KettleException; - - public Object call() throws KettleException { - try { - long limit; - String strData = meta.getLimit(); - - limit = getLimit(strData); - if (limit > 0) { - step.logDebug("Collecting up to " + limit + " messages"); - } else { - step.logDebug("Collecting unlimited messages"); - } - while (data.streamIterator.hasNext() && !data.canceled && (limit <= 0 || data.processed < limit)) { - MessageAndMetadata messageAndMetadata = data.streamIterator.next(); - messageReceived(messageAndMetadata.key(), messageAndMetadata.message()); - ++data.processed; - } - } catch (ConsumerTimeoutException cte) { - step.logDebug("Received a consumer timeout after " + data.processed + " messages"); - if (!meta.isStopOnEmptyTopic()) { - // Because we're not set to stop on empty, this is an abnormal - // timeout - throw new KettleException("Unexpected consumer timeout!", cte); - } - } - // Notify that all messages were read successfully - data.consumer.commitOffsets(); - step.setOutputDone(); - return null; - } - - private long getLimit(String strData) throws KettleException { - long limit; - try { - limit = KafkaConsumerMeta.isEmpty(strData) ? 0 : Long.parseLong(step.environmentSubstitute(strData)); - } catch (NumberFormatException e) { - throw new KettleException("Unable to parse messages limit parameter", e); - } - return limit; - } -} diff --git a/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerData.java b/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerData.java index 9669650..db6f0c1 100644 --- a/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerData.java +++ b/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerData.java @@ -1,22 +1,29 @@ package org.pentaho.di.trans.kafka.consumer; -import kafka.consumer.ConsumerIterator; -import kafka.javaapi.consumer.ConsumerConnector; +import org.apache.kafka.clients.consumer.Consumer; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.trans.step.BaseStepData; import org.pentaho.di.trans.step.StepDataInterface; +import java.time.Instant; + /** * Holds data processed by this step * * @author Michael + * @author Miguel Ángel García */ -public class KafkaConsumerData extends BaseStepData implements StepDataInterface { - - ConsumerConnector consumer; - ConsumerIterator streamIterator; +public class KafkaConsumerData extends BaseStepData implements StepDataInterface +{ + // Kafka consumer. + Consumer consumer; + // Total messages read from a topic. + int totalMessages; + // Maximum messages to read. + int maxMessages; + // Maximum time to read. + long maxTime; + Instant startedOn; + // Output row meta. RowMetaInterface outputRowMeta; - RowMetaInterface inputRowMeta; - boolean canceled; - int processed; } diff --git a/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerMeta.java b/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerMeta.java index 0472d3e..50d82dd 100644 --- a/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerMeta.java +++ b/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerMeta.java @@ -1,6 +1,10 @@ package org.pentaho.di.trans.kafka.consumer; -import kafka.consumer.ConsumerConfig; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.pentaho.di.core.CheckResult; import org.pentaho.di.core.CheckResultInterface; import org.pentaho.di.core.Const; @@ -9,7 +13,6 @@ import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettlePluginException; import org.pentaho.di.core.exception.KettleStepException; -import org.pentaho.di.core.exception.KettleXMLException; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.row.ValueMetaInterface; import org.pentaho.di.core.row.value.ValueMetaFactory; @@ -19,13 +22,16 @@ import org.pentaho.di.repository.Repository; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; -import org.pentaho.di.trans.step.*; +import org.pentaho.di.trans.step.BaseStepMeta; +import org.pentaho.di.trans.step.StepDataInterface; +import org.pentaho.di.trans.step.StepInterface; +import org.pentaho.di.trans.step.StepMeta; +import org.pentaho.di.trans.step.StepMetaInterface; import org.pentaho.metastore.api.IMetaStore; -import org.w3c.dom.Node; +import org.w3c.dom.Node; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -35,9 +41,9 @@ * repository. * * @author Michael Spector + * @author Miguel Ángel García */ -@Step( - id = "KafkaConsumer", +@Step( id = "KafkaConsumer", image = "org/pentaho/di/trans/kafka/consumer/resources/kafka_consumer.png", i18nPackageName = "org.pentaho.di.trans.kafka.consumer", name = "KafkaConsumerDialog.Shell.Title", @@ -45,19 +51,29 @@ documentationUrl = "KafkaConsumerDialog.Shell.DocumentationURL", casesUrl = "KafkaConsumerDialog.Shell.CasesURL", categoryDescription = "i18n:org.pentaho.di.trans.step:BaseStep.Category.Input") -public class KafkaConsumerMeta extends BaseStepMeta implements StepMetaInterface { - +public class KafkaConsumerMeta extends BaseStepMeta implements StepMetaInterface +{ @SuppressWarnings("WeakerAccess") - protected static final String[] KAFKA_PROPERTIES_NAMES = new String[]{"zookeeper.connect", "group.id", "consumer.id", - "socket.timeout.ms", "socket.receive.buffer.bytes", "fetch.message.max.bytes", "auto.commit.interval.ms", - "queued.max.message.chunks", "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.ms", - "rebalance.backoff.ms", "refresh.leader.backoff.ms", "auto.commit.enable", "auto.offset.reset", - "consumer.timeout.ms", "client.id", "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms", - "zookeeper.sync.time.ms"}; + protected static final String[] KAFKA_PROPERTIES_NAMES = new String[]{ + // Server props. + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, + ConsumerConfig.GROUP_ID_CONFIG, ConsumerConfig.CLIENT_ID_CONFIG, + // Fetching props. + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, + ConsumerConfig.FETCH_MIN_BYTES_CONFIG, ConsumerConfig.FETCH_MAX_BYTES_CONFIG, + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, + ConsumerConfig.MAX_POLL_RECORDS_CONFIG, ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + // Security props. + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, + SaslConfigs.SASL_MECHANISM, SaslConfigs.SASL_JAAS_CONFIG + }; - @SuppressWarnings("WeakerAccess") - protected static final Map KAFKA_PROPERTIES_DEFAULTS = new HashMap(); + private Properties kafkaProperties = new Properties(); + // Kafka consumer step behaviour keys. private static final String ATTR_TOPIC = "TOPIC"; private static final String ATTR_FIELD = "FIELD"; private static final String ATTR_KEY_FIELD = "KEY_FIELD"; @@ -65,13 +81,7 @@ public class KafkaConsumerMeta extends BaseStepMeta implements StepMetaInterface private static final String ATTR_TIMEOUT = "TIMEOUT"; private static final String ATTR_STOP_ON_EMPTY_TOPIC = "STOP_ON_EMPTY_TOPIC"; private static final String ATTR_KAFKA = "KAFKA"; - - static { - KAFKA_PROPERTIES_DEFAULTS.put("zookeeper.connect", "localhost:2181"); - KAFKA_PROPERTIES_DEFAULTS.put("group.id", "group"); - } - - private Properties kafkaProperties = new Properties(); + // Kafka consumer step behaviour vars. private String topic; private String field; private String keyField; @@ -79,208 +89,102 @@ public class KafkaConsumerMeta extends BaseStepMeta implements StepMetaInterface private String timeout; private boolean stopOnEmptyTopic; - public static String[] getKafkaPropertiesNames() { - return KAFKA_PROPERTIES_NAMES; - } - - public static Map getKafkaPropertiesDefaults() { - return KAFKA_PROPERTIES_DEFAULTS; - } - - public KafkaConsumerMeta() { - super(); - } - - public Properties getKafkaProperties() { - return kafkaProperties; - } - - @SuppressWarnings("unused") - public Map getKafkaPropertiesMap() { - return getKafkaProperties(); - } - - public void setKafkaProperties(Properties kafkaProperties) { - this.kafkaProperties = kafkaProperties; - } - - @SuppressWarnings("unused") - public void setKafkaPropertiesMap(Map propertiesMap) { - Properties props = new Properties(); - props.putAll(propertiesMap); - setKafkaProperties(props); - } - - /** - * @return Kafka topic name - */ - public String getTopic() { - return topic; - } - - /** - * @param topic Kafka topic name - */ - public void setTopic(String topic) { - this.topic = topic; - } - - /** - * @return Target field name in Kettle stream - */ - public String getField() { - return field; - } - - /** - * @param field Target field name in Kettle stream - */ - public void setField(String field) { - this.field = field; - } - - /** - * @return Target key field name in Kettle stream - */ - public String getKeyField() { - return keyField; - } - - /** - * @param keyField Target key field name in Kettle stream - */ - public void setKeyField(String keyField) { - this.keyField = keyField; - } - - /** - * @return Limit number of entries to read from Kafka queue - */ - public String getLimit() { - return limit; - } - - /** - * @param limit Limit number of entries to read from Kafka queue - */ - public void setLimit(String limit) { - this.limit = limit; - } - - /** - * @return Time limit for reading entries from Kafka queue (in ms) - */ - public String getTimeout() { - return timeout; - } /** - * @param timeout Time limit for reading entries from Kafka queue (in ms) + * Default constructor. */ - public void setTimeout(String timeout) { - this.timeout = timeout; + public KafkaConsumerMeta() + { + super(); } - /** - * @return 'true' if the consumer should stop when no more messages are - * available - */ - public boolean isStopOnEmptyTopic() { - return stopOnEmptyTopic; - } /** - * @param stopOnEmptyTopic If 'true', stop the consumer when no more messages are - * available on the topic + * Checks the step when a user presses on the check transformation button. */ - public void setStopOnEmptyTopic(boolean stopOnEmptyTopic) { - this.stopOnEmptyTopic = stopOnEmptyTopic; - } - + @Override public void check(List remarks, TransMeta transMeta, StepMeta stepMeta, RowMetaInterface prev, String[] input, String[] output, RowMetaInterface info, VariableSpace space, Repository repository, - IMetaStore metaStore) { - - if (topic == null) { + IMetaStore metaStore) + { + if (this.topic == null) { remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, Messages.getString("KafkaConsumerMeta.Check.InvalidTopic"), stepMeta)); } - if (field == null) { + if (this.field == null) { remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, Messages.getString("KafkaConsumerMeta.Check.InvalidField"), stepMeta)); } - if (keyField == null) { + if (this.keyField == null) { remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, Messages.getString("KafkaConsumerMeta.Check.InvalidKeyField"), stepMeta)); } try { - new ConsumerConfig(kafkaProperties); - } catch (IllegalArgumentException e) { + new org.apache.kafka.clients.consumer.KafkaConsumer<>(this.kafkaProperties); + } catch (ConfigException e) { remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, e.getMessage(), stepMeta)); } } - public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta transMeta, - Trans trans) { + @Override + public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta transMeta, Trans trans) + { return new KafkaConsumer(stepMeta, stepDataInterface, cnr, transMeta, trans); } - public StepDataInterface getStepData() { + @Override + public StepDataInterface getStepData() + { return new KafkaConsumerData(); } @Override public void loadXML(Node stepnode, List databases, IMetaStore metaStore) - throws KettleXMLException { - - try { - topic = XMLHandler.getTagValue(stepnode, ATTR_TOPIC); - field = XMLHandler.getTagValue(stepnode, ATTR_FIELD); - keyField = XMLHandler.getTagValue(stepnode, ATTR_KEY_FIELD); - limit = XMLHandler.getTagValue(stepnode, ATTR_LIMIT); - timeout = XMLHandler.getTagValue(stepnode, ATTR_TIMEOUT); - // This tag only exists if the value is "true", so we can directly - // populate the field - stopOnEmptyTopic = XMLHandler.getTagValue(stepnode, ATTR_STOP_ON_EMPTY_TOPIC) != null; - Node kafkaNode = XMLHandler.getSubNode(stepnode, ATTR_KAFKA); - String[] kafkaElements = XMLHandler.getNodeElements(kafkaNode); - if (kafkaElements != null) { - for (String propName : kafkaElements) { - String value = XMLHandler.getTagValue(kafkaNode, propName); - if (value != null) { - kafkaProperties.put(propName, value); - } + { + this.topic = XMLHandler.getTagValue(stepnode, ATTR_TOPIC); + this.field = XMLHandler.getTagValue(stepnode, ATTR_FIELD); + this.keyField = XMLHandler.getTagValue(stepnode, ATTR_KEY_FIELD); + this.limit = XMLHandler.getTagValue(stepnode, ATTR_LIMIT); + this.timeout = XMLHandler.getTagValue(stepnode, ATTR_TIMEOUT); + // This tag only exists if the value is "true", so we can directly populate the field. + this.stopOnEmptyTopic = XMLHandler.getTagValue(stepnode, ATTR_STOP_ON_EMPTY_TOPIC) != null; + Node kafkaNode = XMLHandler.getSubNode(stepnode, ATTR_KAFKA); + String[] kafkaElements = XMLHandler.getNodeElements(kafkaNode); + if (kafkaElements != null) { + for (String propName : kafkaElements) { + String value = XMLHandler.getTagValue(kafkaNode, propName); + if (value != null) { + this.kafkaProperties.put(propName, value); } } - } catch (Exception e) { - throw new KettleXMLException(Messages.getString("KafkaConsumerMeta.Exception.loadXml"), e); } } @Override - public String getXML() throws KettleException { + public String getXML() + { StringBuilder retval = new StringBuilder(); - if (topic != null) { + if (this.topic != null) { retval.append(" ").append(XMLHandler.addTagValue(ATTR_TOPIC, topic)); } - if (field != null) { + if (this.field != null) { retval.append(" ").append(XMLHandler.addTagValue(ATTR_FIELD, field)); } - if (keyField != null) { + if (this.keyField != null) { retval.append(" ").append(XMLHandler.addTagValue(ATTR_KEY_FIELD, keyField)); } - if (limit != null) { + if (this.limit != null) { retval.append(" ").append(XMLHandler.addTagValue(ATTR_LIMIT, limit)); } - if (timeout != null) { + if (this.timeout != null) { retval.append(" ").append(XMLHandler.addTagValue(ATTR_TIMEOUT, timeout)); } - if (stopOnEmptyTopic) { + if (this.stopOnEmptyTopic) { retval.append(" ").append(XMLHandler.addTagValue(ATTR_STOP_ON_EMPTY_TOPIC, "true")); } retval.append(" ").append(XMLHandler.openTag(ATTR_KAFKA)).append(Const.CR); - for (String name : kafkaProperties.stringPropertyNames()) { - String value = kafkaProperties.getProperty(name); + for (String name : this.kafkaProperties.stringPropertyNames()) { + String value = this.kafkaProperties.getProperty(name); if (value != null) { retval.append(" ").append(XMLHandler.addTagValue(name, value)); } @@ -290,25 +194,18 @@ public String getXML() throws KettleException { } @Override - public void readRep(Repository rep, IMetaStore metaStore, ObjectId stepId, List databases) - throws KettleException { + public void readRep(Repository rep, IMetaStore metaStore, ObjectId stepId, List databases) throws KettleException + { try { - topic = rep.getStepAttributeString(stepId, ATTR_TOPIC); - field = rep.getStepAttributeString(stepId, ATTR_FIELD); - keyField = rep.getStepAttributeString(stepId, ATTR_KEY_FIELD); - limit = rep.getStepAttributeString(stepId, ATTR_LIMIT); - timeout = rep.getStepAttributeString(stepId, ATTR_TIMEOUT); - stopOnEmptyTopic = rep.getStepAttributeBoolean(stepId, ATTR_STOP_ON_EMPTY_TOPIC); + this.topic = rep.getStepAttributeString(stepId, ATTR_TOPIC); + this.field = rep.getStepAttributeString(stepId, ATTR_FIELD); + this.keyField = rep.getStepAttributeString(stepId, ATTR_KEY_FIELD); + this.limit = rep.getStepAttributeString(stepId, ATTR_LIMIT); + this.timeout = rep.getStepAttributeString(stepId, ATTR_TIMEOUT); + this.stopOnEmptyTopic = rep.getStepAttributeBoolean(stepId, ATTR_STOP_ON_EMPTY_TOPIC); String kafkaPropsXML = rep.getStepAttributeString(stepId, ATTR_KAFKA); if (kafkaPropsXML != null) { - kafkaProperties.loadFromXML(new ByteArrayInputStream(kafkaPropsXML.getBytes())); - } - // Support old versions: - for (String name : KAFKA_PROPERTIES_NAMES) { - String value = rep.getStepAttributeString(stepId, name); - if (value != null) { - kafkaProperties.put(name, value); - } + this.kafkaProperties.loadFromXML(new ByteArrayInputStream(kafkaPropsXML.getBytes())); } } catch (Exception e) { throw new KettleException("KafkaConsumerMeta.Exception.loadRep", e); @@ -316,27 +213,28 @@ public void readRep(Repository rep, IMetaStore metaStore, ObjectId stepId, List< } @Override - public void saveRep(Repository rep, IMetaStore metaStore, ObjectId transformationId, ObjectId stepId) throws KettleException { + public void saveRep(Repository rep, IMetaStore metaStore, ObjectId transformationId, ObjectId stepId) throws KettleException + { try { - if (topic != null) { - rep.saveStepAttribute(transformationId, stepId, ATTR_TOPIC, topic); + if (this.topic != null) { + rep.saveStepAttribute(transformationId, stepId, ATTR_TOPIC, this.topic); } - if (field != null) { - rep.saveStepAttribute(transformationId, stepId, ATTR_FIELD, field); + if (this.field != null) { + rep.saveStepAttribute(transformationId, stepId, ATTR_FIELD, this.field); } - if (keyField != null) { - rep.saveStepAttribute(transformationId, stepId, ATTR_KEY_FIELD, keyField); + if (this.keyField != null) { + rep.saveStepAttribute(transformationId, stepId, ATTR_KEY_FIELD, this.keyField); } - if (limit != null) { - rep.saveStepAttribute(transformationId, stepId, ATTR_LIMIT, limit); + if (this.limit != null) { + rep.saveStepAttribute(transformationId, stepId, ATTR_LIMIT, this.limit); } - if (timeout != null) { - rep.saveStepAttribute(transformationId, stepId, ATTR_TIMEOUT, timeout); + if (this.timeout != null) { + rep.saveStepAttribute(transformationId, stepId, ATTR_TIMEOUT, this.timeout); } - rep.saveStepAttribute(transformationId, stepId, ATTR_STOP_ON_EMPTY_TOPIC, stopOnEmptyTopic); + rep.saveStepAttribute(transformationId, stepId, ATTR_STOP_ON_EMPTY_TOPIC, this.stopOnEmptyTopic); ByteArrayOutputStream buf = new ByteArrayOutputStream(); - kafkaProperties.storeToXML(buf, null); + this.kafkaProperties.storeToXML(buf, null); rep.saveStepAttribute(transformationId, stepId, ATTR_KAFKA, buf.toString()); } catch (Exception e) { throw new KettleException("KafkaConsumerMeta.Exception.saveRep", e); @@ -344,32 +242,161 @@ public void saveRep(Repository rep, IMetaStore metaStore, ObjectId transformatio } /** - * Set default values to the transformation + * Set default values to the transformation. */ - public void setDefault() { - setTopic(""); + @Override + public void setDefault() + { + this.topic = ""; + this.keyField = "key"; + this.field = "msg"; + this.limit = "0"; + this.timeout = "0"; } - public void getFields(RowMetaInterface rowMeta, String origin, RowMetaInterface[] info, StepMeta nextStep, - VariableSpace space, Repository repository, IMetaStore metaStore) throws KettleStepException { - + /** + * Modifies the input row adding new fields to the row structure. + */ + @Override + public void getFields(RowMetaInterface rowMeta, String name, RowMetaInterface[] info, StepMeta nextStep, + VariableSpace space, Repository repository, IMetaStore metaStore) throws KettleStepException + { try { - ValueMetaInterface fieldValueMeta = ValueMetaFactory.createValueMeta(getField(), ValueMetaInterface.TYPE_BINARY); - fieldValueMeta.setOrigin(origin); - rowMeta.addValueMeta(fieldValueMeta); - - ValueMetaInterface keyFieldValueMeta = ValueMetaFactory.createValueMeta(getKeyField(), ValueMetaInterface.TYPE_BINARY); - keyFieldValueMeta.setOrigin(origin); - rowMeta.addValueMeta(keyFieldValueMeta); - + // Add the key field to the row. + ValueMetaInterface keyValueMeta = ValueMetaFactory.createValueMeta(this.keyField, ValueMetaInterface.TYPE_STRING); + keyValueMeta.setOrigin(name); + rowMeta.addValueMeta(keyValueMeta); + // Add the message field to the row. + ValueMetaInterface messageValueMeta = ValueMetaFactory.createValueMeta(this.field, ValueMetaInterface.TYPE_STRING); + messageValueMeta.setOrigin(name); + rowMeta.addValueMeta(messageValueMeta); } catch (KettlePluginException e) { throw new KettleStepException("KafkaConsumerMeta.Exception.getFields", e); } + } + + /** + * Gets the default value for the props. + */ + public static String defaultProp(String prop) + { + switch (prop) { + case ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG: + return "localhost:2181"; + case ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG: + case ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG: + return "org.apache.kafka.common.serialization.StringDeserializer"; + case ConsumerConfig.AUTO_OFFSET_RESET_CONFIG: + return "earliest"; + case ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG: + return "true"; + case ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG: + return "10000"; + case ConsumerConfig.MAX_POLL_RECORDS_CONFIG: + return "1"; + case CommonClientConfigs.SECURITY_PROTOCOL_CONFIG: + return "SASL_SSL"; + case SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG: + return "/var/www/data/trustore.jks"; + case SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG: + return "1234567890"; + case SaslConfigs.SASL_MECHANISM: + return "PLAIN"; + case SaslConfigs.SASL_JAAS_CONFIG: + return "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"mr.robot\" password=\"12345\" serviceName=\"kafka\";"; + default: + return "(default)"; + } + } + + + // Getters & Setters. + + @SuppressWarnings("unused") + public Map getKafkaPropertiesMap() + { + return this.getKafkaProperties(); + } + + @SuppressWarnings("unused") + public void setKafkaPropertiesMap(Map propertiesMap) + { + Properties props = new Properties(); + props.putAll(propertiesMap); + this.kafkaProperties = props; + } + + public static String[] getKafkaPropertiesNames() + { + return KafkaConsumerMeta.KAFKA_PROPERTIES_NAMES; + } + + public Properties getKafkaProperties() + { + return this.kafkaProperties; + } + + public void setKafkaProperties(Properties kafkaProperties) + { + this.kafkaProperties = kafkaProperties; + } + + public String getTopic() + { + return this.topic; + } + + public void setTopic(String topic) + { + this.topic = topic; + } + + public String getField() + { + return this.field; + } + + public void setField(String field) + { + this.field = field; + } + + public String getKeyField() + { + return this.keyField; + } + + public void setKeyField(String keyField) + { + this.keyField = keyField; + } + + public String getLimit() + { + return this.limit; + } + public void setLimit(String limit) + { + this.limit = limit; } - public static boolean isEmpty(String str) { - return str == null || str.length() == 0; + public String getTimeout() + { + return this.timeout; } + public void setTimeout(String timeout) + { + this.timeout = timeout; + } + + public boolean isStopOnEmptyTopic() + { + return this.stopOnEmptyTopic; + } + + public void setStopOnEmptyTopic(boolean stopOnEmptyTopic) { + this.stopOnEmptyTopic = stopOnEmptyTopic; + } } diff --git a/src/main/java/org/pentaho/di/ui/trans/kafka/consumer/KafkaConsumerDialog.java b/src/main/java/org/pentaho/di/ui/trans/kafka/consumer/KafkaConsumerDialog.java index 5d96320..b2f74d3 100644 --- a/src/main/java/org/pentaho/di/ui/trans/kafka/consumer/KafkaConsumerDialog.java +++ b/src/main/java/org/pentaho/di/ui/trans/kafka/consumer/KafkaConsumerDialog.java @@ -1,11 +1,24 @@ package org.pentaho.di.ui.trans.kafka.consumer; import org.eclipse.swt.SWT; -import org.eclipse.swt.events.*; +import org.eclipse.swt.events.ModifyEvent; +import org.eclipse.swt.events.ModifyListener; +import org.eclipse.swt.events.SelectionAdapter; +import org.eclipse.swt.events.SelectionEvent; +import org.eclipse.swt.events.ShellAdapter; +import org.eclipse.swt.events.ShellEvent; import org.eclipse.swt.layout.FormAttachment; import org.eclipse.swt.layout.FormData; import org.eclipse.swt.layout.FormLayout; -import org.eclipse.swt.widgets.*; +import org.eclipse.swt.widgets.Button; +import org.eclipse.swt.widgets.Control; +import org.eclipse.swt.widgets.Display; +import org.eclipse.swt.widgets.Event; +import org.eclipse.swt.widgets.Label; +import org.eclipse.swt.widgets.Listener; +import org.eclipse.swt.widgets.Shell; +import org.eclipse.swt.widgets.TableItem; +import org.eclipse.swt.widgets.Text; import org.pentaho.di.core.Const; import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.trans.TransMeta; @@ -26,9 +39,10 @@ * UI for the Kafka Consumer step * * @author Michael Spector + * @author Miguel Ángel García */ -public class KafkaConsumerDialog extends BaseStepDialog implements StepDialogInterface { - +public class KafkaConsumerDialog extends BaseStepDialog implements StepDialogInterface +{ private KafkaConsumerMeta consumerMeta; private TextVar wTopicName; private TextVar wFieldName; @@ -38,22 +52,26 @@ public class KafkaConsumerDialog extends BaseStepDialog implements StepDialogInt private TextVar wTimeout; private Button wStopOnEmptyTopic; - public KafkaConsumerDialog(Shell parent, Object in, TransMeta tr, String sname) { + public KafkaConsumerDialog(Shell parent, Object in, TransMeta tr, String sname) + { super(parent, (BaseStepMeta) in, tr, sname); consumerMeta = (KafkaConsumerMeta) in; } - public KafkaConsumerDialog(Shell parent, BaseStepMeta baseStepMeta, TransMeta transMeta, String stepname) { + public KafkaConsumerDialog(Shell parent, BaseStepMeta baseStepMeta, TransMeta transMeta, String stepname) + { super(parent, baseStepMeta, transMeta, stepname); consumerMeta = (KafkaConsumerMeta) baseStepMeta; } - public KafkaConsumerDialog(Shell parent, int nr, BaseStepMeta in, TransMeta tr) { + public KafkaConsumerDialog(Shell parent, int nr, BaseStepMeta in, TransMeta tr) + { super(parent, nr, in, tr); consumerMeta = (KafkaConsumerMeta) in; } - public String open() { + public String open() + { Shell parent = getParent(); Display display = parent.getDisplay(); @@ -284,7 +302,8 @@ public void shellClosed(ShellEvent e) { /** * Copy information from the meta-data input to the dialog fields. */ - private void getData(KafkaConsumerMeta consumerMeta, boolean copyStepname) { + private void getData(KafkaConsumerMeta consumerMeta, boolean copyStepname) + { if (copyStepname) { wStepname.setText(stepname); } @@ -306,10 +325,7 @@ private void getData(KafkaConsumerMeta consumerMeta, boolean copyStepname) { TableItem item = new TableItem(wProps.table, i++ > 1 ? SWT.BOLD : SWT.NONE); int colnr = 1; item.setText(colnr++, Const.NVL(propName, "")); - String defaultValue = KafkaConsumerMeta.getKafkaPropertiesDefaults().get(propName); - if (defaultValue == null) { - defaultValue = "(default)"; - } + String defaultValue = KafkaConsumerMeta.defaultProp(propName); item.setText(colnr++, Const.NVL(value, defaultValue)); } @@ -320,7 +336,8 @@ private void getData(KafkaConsumerMeta consumerMeta, boolean copyStepname) { wStepname.selectAll(); } - private void cancel() { + private void cancel() + { stepname = null; consumerMeta.setChanged(changed); dispose(); @@ -329,7 +346,8 @@ private void cancel() { /** * Copy information from the dialog fields to the meta-data input */ - private void setData(KafkaConsumerMeta consumerMeta) { + private void setData(KafkaConsumerMeta consumerMeta) + { consumerMeta.setTopic(wTopicName.getText()); consumerMeta.setField(wFieldName.getText()); consumerMeta.setKeyField(wKeyFieldName.getText()); @@ -344,7 +362,7 @@ private void setData(KafkaConsumerMeta consumerMeta) { int colnr = 1; String name = item.getText(colnr++); String value = item.getText(colnr++).trim(); - if (value.length() > 0 && !"(default)".equals(value)) { + if (value.length() > 0) { kafkaProperties.put(name, value); } else { kafkaProperties.remove(name); @@ -357,8 +375,9 @@ private void setData(KafkaConsumerMeta consumerMeta) { consumerMeta.setChanged(); } - private void ok() { - if (KafkaConsumerMeta.isEmpty(wStepname.getText())) { + private void ok() + { + if (wStepname.getText() == null || wStepname.getText().length() == 0) { return; } setData(consumerMeta); diff --git a/src/main/resources/org/pentaho/di/trans/kafka/consumer/resources/kafka_consumer.png b/src/main/resources/org/pentaho/di/trans/kafka/consumer/resources/kafka_consumer.png index 88c37329854ab8afec0e801e970f5e3288385125..c51709471c52ea4832be5adf206a3960eb18bd55 100644 GIT binary patch literal 1751 zcmaJ?Yfuwc6b=s&0UuaIKq;G0R8%(Mk%Xj#N3wxrXfQ+`p<-i57Dy$#akDXjf+C$k ztuiVohzcl;AOl)wgaHMLOaXBeD^jeWMbQGjD%6PR;7B(p*dL{LX7}E6=X~G!&SPiy zilZVH+PT@0NTh{A0VE-I!2H-+67P2|fBi^ot~g(cZ$y)Ey&Qu{VM;U!283#P3M_%; z%Jgkl;UE&pYLzNRic3Xbu@$Hql$(7(gIYtNNu;0NG^lHgG7@ylmoZzNS)u zX%~Dehw?#`R3rv?Cs?fFEH~X@OKfnh%Y|ru(yLe!#~? zA);Z*WVQqf{}_v~IFwWz*RZKnygVS}c=%}35nOKYLA0JT z3=phUU@8r+LJ`2sC{IFlIEO+g{g8rMGfRtTKbDCo7}X%xP<=s~Ii(p#k?8+j)#_Pq zEiQpS>HSY)ZA`ibrb=Kfs>2jSfH@f9=6UF_BKfG){E7&Lb zIlsAOptAHe@?FYE*3_@DYIWfYT8hJFaG9fv6_}aC|dot*h_)PE9`g&&lUH8?X zx3_n9;pF6GNN})a%h0P=wM|VyQC!dF=4NC65cbEwz{5u-lYdBvOqvc^Em*KXCXKY9{U|F9tsS@GBbn7I-Sm?tgOsy zU+dr?`CKN3mG?Zk)Y$0#c}Yo0)2Z|4HN-*?BsEoVTC(?|ZGK5f*3O+Cwo8w9byU~Y z)$IzIH?hQ@B#ew);p*Bt*x1;3`*vJMG~4(aQoq6>cW4McX1UxyG^fDc-o90^VZ-9f zB_;0R327qA+*wssRZvh6Ty|iGM@*|@>CvO4 zhkqN+ydVUFAebFMr?)>XHyVv5lPUBl@I_BgPwSpv2H(7Y-*Kg{uQ(Ar(0h4!cvXok zF%g|JebQ0@7#fI8{=xN5pfUw{9@!qo*= ztCGD#$%qTlVb`jf8i)JNJ=b!+UdV=HV`BxAMyKri;yaGdhUYIQ zQ^Pc5$KhCGoZ#Kv{{A+%MKJl;v8zfqcXuzJuCA^o3*5Q@Sht_g=O^h%$-6I2Oib+G z@4&UOvB_TP?k+T?d4piZGynY3(nZ#ptMe?JEZ!O`Dsq>!ebzHRu2VULc64+!To@f4 z?a?PB6dyitzqr^nD!;O_a*UhDS9X_;8a9(REm1pZ538f!KPVzukp^v#-<$IKbEElJ NBjiUx=eS#P{sE5Mv_}8{ delta 1563 zcmV+$2ITqI4WSH>8Gi-<0047(dh`GQ00v@9M??Vs0RI60puMM)00009a7bBm000*f z000*f0cW4mQUCw|2XskIMF-ss7Y;QIyi~Mz000H9Nkliy zK75!$VP`lDEuq2V^$bF_7_tI}K{bf!D8I5?Aqjz!StY1gQvvEaOeP~1e!MWfN#2wP{_)_=n>Mi^ z5a6pAmuCC!nK|fnbJ+H7@ttl_kb)q{K zL;JI5m^W{3ulWe0(FC902LgEVrVO+U- z6@N2}i$M~RNCa!wu0@u^f$HjNe7<1=Mvoqi7cXAm_U${c*=(3Pb!zYB9Igq-_p`I9 zDhl)c{uHz5dV<@xe??VM=ovef?d|P-)^xVEw({zgD~v{?LrTR1DiH4IU}k0}{XQS- z>gooF4D!9KEatl1+`oU{fa`Bp3q+C~7=H}%fFioC_oi$E()SZ@z+jrWTrR4L!pXs4 zzsT6Rb0@tX4?SbYvNIe`?ukTR^W@2&c~^Y( z!gLryE$G4eb)PX17{{kipQgxYnt#TEf&$tzGkNpIjg);K78ld$boTLw5!Heo3uRcW z5D*wO%9`Tkk^l?_12XM)ocZNUGK4fOj{EoTLlOeHxw*sdi2BClv13Q^YjZOai3E0j zvkT#HIC+tfB$Stbidc6Hd-jB|ZQC|fReg(KVGyINRs;e8SS;3&HzafB%zwe3_wMz+ zN`u8>LFe=5xNzY&1OkDi+^SV8QGdF=r&b7o*O3U!W-}U_n&9<%M%ESY*ilK3$3sO? zxMj;0p02B7K_Ea?QCL(|)GyAjUAxA`ix+e5yt&*P3cXDp=<#m1+s$$L`8;so`;>!% zg@trD9DMNL{<~|-NkWz+Xn&g4+kE~Di^T#-2t0c9*Sl-W4F)5!va%q{GWP7=oou&j zYfs|w>ODCv0nL!>p27$ys-<8*LBTU^E!9sB95Jp}p_&Yt}PPojS$I z!64l(7u_xwCrlX6rAwC#QgZ+2&uL%nJ3GU8`SK;a-n`+L+%ydQ4HSYx+gR!>Npk=I N002ovPDHLkV1j1$`1=3= diff --git a/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerDataTest.java b/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerDataTest.java index 51d03ad..7386e65 100644 --- a/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerDataTest.java +++ b/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerDataTest.java @@ -3,9 +3,11 @@ import org.junit.Assert; import org.junit.Test; -public class KafkaConsumerDataTest { +public class KafkaConsumerDataTest +{ @Test - public void testDefaults() { + public void testDefaults() + { KafkaConsumerData data = new KafkaConsumerData(); Assert.assertNull(data.outputRowMeta); } diff --git a/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerMetaTest.java b/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerMetaTest.java index d50f113..12ac914 100644 --- a/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerMetaTest.java +++ b/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerMetaTest.java @@ -21,26 +21,32 @@ import org.pentaho.di.trans.steps.loadsave.validator.MapLoadSaveValidator; import org.pentaho.di.trans.steps.loadsave.validator.StringLoadSaveValidator; -import java.util.*; - +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; import static org.junit.Assert.*; -public class KafkaConsumerMetaTest { - +public class KafkaConsumerMetaTest +{ @BeforeClass - public static void setUpBeforeClass() throws KettleException { + public static void setUpBeforeClass() throws KettleException + { KettleEnvironment.init(false); } @Test - public void testGetStepData() { + public void testGetStepData() + { KafkaConsumerMeta m = new KafkaConsumerMeta(); assertEquals(KafkaConsumerData.class, m.getStepData().getClass()); } @Test - public void testStepAnnotations() { - + public void testStepAnnotations() + { // PDI Plugin Annotation-based Classloader checks Step stepAnnotation = KafkaConsumerMeta.class.getAnnotation(Step.class); assertNotNull(stepAnnotation); @@ -60,7 +66,8 @@ public void testStepAnnotations() { } @Test - public void testDefaults() throws KettleStepException { + public void testDefaults() throws KettleStepException + { KafkaConsumerMeta m = new KafkaConsumerMeta(); m.setDefault(); @@ -71,8 +78,8 @@ public void testDefaults() throws KettleStepException { assertEquals(2, rowMeta.size()); // those fields must strings and named as configured - assertEquals(ValueMetaInterface.TYPE_BINARY, rowMeta.getValueMeta(0).getType()); // TODO change to string - assertEquals(ValueMetaInterface.TYPE_BINARY, rowMeta.getValueMeta(1).getType()); // TODO change to string + assertEquals(ValueMetaInterface.TYPE_STRING, rowMeta.getValueMeta(0).getType()); + assertEquals(ValueMetaInterface.TYPE_STRING, rowMeta.getValueMeta(1).getType()); assertEquals(ValueMetaInterface.STORAGE_TYPE_NORMAL, rowMeta.getValueMeta(0).getStorageType()); assertEquals(ValueMetaInterface.STORAGE_TYPE_NORMAL, rowMeta.getValueMeta(1).getStorageType()); // TODO check naming @@ -80,11 +87,11 @@ public void testDefaults() throws KettleStepException { } @Test - public void testLoadSave() throws KettleException { - + public void testLoadSave() throws KettleException + { List attributes = Arrays.asList("topic", "field", "keyField", "limit", "timeout", "kafka", "stopOnEmptyTopic"); - Map getterMap = new HashMap(); + Map getterMap = new HashMap<>(); getterMap.put("topic", "getTopic"); getterMap.put("field", "getField"); getterMap.put("keyField", "getKeyField"); @@ -93,7 +100,7 @@ public void testLoadSave() throws KettleException { getterMap.put("kafka", "getKafkaPropertiesMap"); getterMap.put("stopOnEmptyTopic", "isStopOnEmptyTopic"); - Map setterMap = new HashMap(); + Map setterMap = new HashMap<>(); setterMap.put("topic", "setTopic"); setterMap.put("field", "setField"); setterMap.put("keyField", "setKeyField"); @@ -102,11 +109,9 @@ public void testLoadSave() throws KettleException { setterMap.put("kafka", "setKafkaPropertiesMap"); setterMap.put("stopOnEmptyTopic", "setStopOnEmptyTopic"); - Map> fieldLoadSaveValidatorAttributeMap = - new HashMap>(); - Map> fieldLoadSaveValidatorTypeMap = - new HashMap>(); - fieldLoadSaveValidatorAttributeMap.put("kafka", new MapLoadSaveValidator( + Map> fieldLoadSaveValidatorAttributeMap = new HashMap<>(); + Map> fieldLoadSaveValidatorTypeMap = new HashMap<>(); + fieldLoadSaveValidatorAttributeMap.put("kafka", new MapLoadSaveValidator<>( new KeyStringLoadSaveValidator(), new StringLoadSaveValidator())); LoadSaveTester tester = new LoadSaveTester(KafkaConsumerMeta.class, attributes, getterMap, setterMap, fieldLoadSaveValidatorAttributeMap, fieldLoadSaveValidatorTypeMap); @@ -115,11 +120,12 @@ public void testLoadSave() throws KettleException { } @Test - public void testChecksEmpty() { + public void testChecksEmpty() + { KafkaConsumerMeta m = new KafkaConsumerMeta(); // Test missing Topic name - List checkResults = new ArrayList(); + List checkResults = new ArrayList<>(); m.check(checkResults, new TransMeta(), new StepMeta(), null, null, null, null, new Variables(), new MemoryRepository(), null); assertFalse(checkResults.isEmpty()); boolean foundMatch = false; @@ -153,14 +159,15 @@ public void testChecksEmpty() { } @Test - public void testChecksNotEmpty() { + public void testChecksNotEmpty() + { KafkaConsumerMeta m = new KafkaConsumerMeta(); m.setTopic(UUID.randomUUID().toString()); m.setField(UUID.randomUUID().toString()); m.setKeyField(UUID.randomUUID().toString()); // Test present Topic name - List checkResults = new ArrayList(); + List checkResults = new ArrayList<>(); m.check(checkResults, new TransMeta(), new StepMeta(), null, null, null, null, new Variables(), new MemoryRepository(), null); assertFalse(checkResults.isEmpty()); boolean foundMatch = false; @@ -194,23 +201,19 @@ public void testChecksNotEmpty() { } - @Test - public void testIsEmpty() { - assertTrue("isEmpty should return true with empty string", KafkaConsumerMeta.isEmpty("")); - assertTrue("isEmpty should return true with null string", KafkaConsumerMeta.isEmpty(null)); - } - /** * Private class to generate alphabetic xml tags */ - private class KeyStringLoadSaveValidator extends StringLoadSaveValidator { + private class KeyStringLoadSaveValidator extends StringLoadSaveValidator + { @Override public String getTestObject() { return "k" + UUID.randomUUID().toString(); } } - private void hasi18nValue(String i18nPackageName, String messageId) { + private void hasi18nValue(String i18nPackageName, String messageId) + { String fakeId = UUID.randomUUID().toString(); String fakeLocalized = BaseMessages.getString(i18nPackageName, fakeId); assertEquals("The way to identify a missing localization key has changed", "!" + fakeId + "!", fakeLocalized); diff --git a/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerTest.java b/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerTest.java index 01093ff..f6b4274 100644 --- a/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerTest.java +++ b/src/test/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumerTest.java @@ -1,19 +1,8 @@ package org.pentaho.di.trans.kafka.consumer; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ZookeeperConsumerConnector; -import kafka.message.Message; -import kafka.message.MessageAndMetadata; -import kafka.serializer.DefaultDecoder; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.RowMetaAndData; import org.pentaho.di.core.exception.KettleException; @@ -25,80 +14,80 @@ import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.TransTestFactory; import org.pentaho.di.trans.step.StepMeta; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.*; - +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; import static org.junit.Assert.*; import static org.mockito.Mockito.*; + @PowerMockIgnore("javax.management.*") @RunWith(PowerMockRunner.class) -@PrepareForTest({Consumer.class}) -public class KafkaConsumerTest { - +@PrepareForTest({KafkaConsumer.class}) +public class KafkaConsumerTest +{ private static final String STEP_NAME = "Kafka Step"; - private static final String STEP_LIMIT = "10000"; - - @Mock - private HashMap>> streamsMap; - @Mock - private KafkaStream kafkaStream; - @Mock - private ZookeeperConsumerConnector zookeeperConsumerConnector; - @Mock - private ConsumerIterator streamIterator; - @Mock - private ArrayList> stream; - + private static final String STEP_TOPIC = "Test"; + private static final String STEP_LIMIT = "10"; + private org.apache.kafka.clients.consumer.KafkaConsumer consumer; private StepMeta stepMeta; private KafkaConsumerMeta meta; private KafkaConsumerData data; private TransMeta transMeta; private Trans trans; + @BeforeClass - public static void setUpBeforeClass() throws KettleException { + public static void setUpBeforeClass() throws KettleException + { KettleEnvironment.init(false); } @Before - public void setUp() { + public void setUp() throws Exception + { data = new KafkaConsumerData(); meta = new KafkaConsumerMeta(); - meta.setKafkaProperties(getDefaultKafkaProperties()); - meta.setLimit(STEP_LIMIT); + + // Default values. + meta.setTopic(STEP_TOPIC); + meta.setTimeout("0"); + meta.setLimit("0"); + meta.getKafkaProperties().setProperty("bootstrap.servers", meta.defaultProp("bootstrap.servers")); + meta.getKafkaProperties().setProperty("key.deserializer", meta.defaultProp("key.deserializer")); + meta.getKafkaProperties().setProperty("value.deserializer", meta.defaultProp("key.deserializer")); stepMeta = new StepMeta("KafkaConsumer", meta); transMeta = new TransMeta(); transMeta.addStep(stepMeta); trans = new Trans(transMeta); - PowerMockito.mockStatic(Consumer.class); - - when(Consumer.createJavaConsumerConnector(any(ConsumerConfig.class))).thenReturn(zookeeperConsumerConnector); - when(zookeeperConsumerConnector.createMessageStreams(anyMapOf(String.class, Integer.class))).thenReturn(streamsMap); - when(streamsMap.get(anyObject())).thenReturn(stream); - when(stream.get(anyInt())).thenReturn(kafkaStream); - when(kafkaStream.iterator()).thenReturn(streamIterator); - when(streamIterator.next()).thenReturn(generateKafkaMessage()); - } - - @Test(expected = IllegalArgumentException.class) - public void stepInitConfigIssue() throws Exception { - KafkaConsumer step = new KafkaConsumer(stepMeta, data, 1, transMeta, trans); - meta.setKafkaProperties(new Properties()); - - step.init(meta, data); + // Mock Kafka consumer. + this.consumer = PowerMockito.mock(org.apache.kafka.clients.consumer.KafkaConsumer.class); + PowerMockito.whenNew(org.apache.kafka.clients.consumer.KafkaConsumer.class) + .withParameterTypes(Properties.class) + .withArguments(any(Properties.class)) + .thenReturn(this.consumer); } @Test(expected = KettleException.class) - public void illegalTimeout() throws KettleException { + public void illegalTimeout() throws KettleException + { meta.setTimeout("aaa"); TransMeta tm = TransTestFactory.generateTestTransformation(new Variables(), meta, STEP_NAME); + PowerMockito.when(this.consumer.poll(anyLong())).thenReturn(ConsumerRecords.empty()); TransTestFactory.executeTestTransformation(tm, TransTestFactory.INJECTOR_STEPNAME, STEP_NAME, TransTestFactory.DUMMY_STEPNAME, new ArrayList()); @@ -107,9 +96,11 @@ public void illegalTimeout() throws KettleException { } @Test(expected = KettleException.class) - public void invalidLimit() throws KettleException { + public void invalidLimit() throws KettleException + { meta.setLimit("aaa"); TransMeta tm = TransTestFactory.generateTestTransformation(new Variables(), meta, STEP_NAME); + PowerMockito.when(this.consumer.poll(anyLong())).thenReturn(ConsumerRecords.empty()); TransTestFactory.executeTestTransformation(tm, TransTestFactory.INJECTOR_STEPNAME, STEP_NAME, TransTestFactory.DUMMY_STEPNAME, new ArrayList()); @@ -117,26 +108,14 @@ public void invalidLimit() throws KettleException { fail("Invalid limit value should lead to exception"); } + // If the step does not receive any rows, the transformation should run successfully @Test - public void withStopOnEmptyTopic() throws KettleException { + public void testNoInput() throws KettleException + { + meta.setTimeout("1000"); - meta.setStopOnEmptyTopic(true); - TransMeta tm = TransTestFactory.generateTestTransformation(new Variables(), meta, STEP_NAME); - - TransTestFactory.executeTestTransformation(tm, TransTestFactory.INJECTOR_STEPNAME, - STEP_NAME, TransTestFactory.DUMMY_STEPNAME, new ArrayList()); - - PowerMockito.verifyStatic(); - ArgumentCaptor consumerConfig = ArgumentCaptor.forClass(ConsumerConfig.class); - Consumer.createJavaConsumerConnector(consumerConfig.capture()); - - assertEquals(1000, consumerConfig.getValue().consumerTimeoutMs()); - } - - // If the step does not receive any rows, the transformation should still run successfully - @Test - public void testNoInput() throws KettleException { TransMeta tm = TransTestFactory.generateTestTransformation(new Variables(), meta, STEP_NAME); + PowerMockito.when(this.consumer.poll(anyLong())).thenReturn(ConsumerRecords.empty()); List result = TransTestFactory.executeTestTransformation(tm, TransTestFactory.INJECTOR_STEPNAME, STEP_NAME, TransTestFactory.DUMMY_STEPNAME, new ArrayList()); @@ -147,13 +126,14 @@ public void testNoInput() throws KettleException { // If the step receives rows without any fields, there should be a two output fields (key + value) on each row @Test - public void testInputNoFields() throws KettleException { + public void testInputNoFields() throws KettleException + { meta.setKeyField("aKeyField"); meta.setField("aField"); - - when(streamIterator.hasNext()).thenReturn(true); + meta.setLimit(STEP_LIMIT); TransMeta tm = TransTestFactory.generateTestTransformation(new Variables(), meta, STEP_NAME); + PowerMockito.when(this.consumer.poll(anyLong())).thenReturn(this.generateRecords()); List result = TransTestFactory.executeTestTransformation(tm, TransTestFactory.INJECTOR_STEPNAME, STEP_NAME, TransTestFactory.DUMMY_STEPNAME, generateInputData(2, false)); @@ -162,45 +142,18 @@ public void testInputNoFields() throws KettleException { assertEquals(Integer.parseInt(STEP_LIMIT), result.size()); for (int i = 0; i < Integer.parseInt(STEP_LIMIT); i++) { assertEquals(2, result.get(i).size()); - assertEquals("aMessage", result.get(i).getString(0, "default value")); + assertEquals("value", result.get(i).getString(1, "null")); } } - // If the step receives rows without any fields, there should be a two output fields (key + value) on each row - @Test - public void testInputFields() throws KettleException { - meta.setKeyField("aKeyField"); - meta.setField("aField"); - - when(streamIterator.hasNext()).thenReturn(true); - - TransMeta tm = TransTestFactory.generateTestTransformation(new Variables(), meta, STEP_NAME); - - List result = TransTestFactory.executeTestTransformation(tm, TransTestFactory.INJECTOR_STEPNAME, - STEP_NAME, TransTestFactory.DUMMY_STEPNAME, generateInputData(3, true)); - - assertNotNull(result); - assertEquals(Integer.parseInt(STEP_LIMIT), result.size()); - for (int i = 0; i < Integer.parseInt(STEP_LIMIT); i++) { - assertEquals(3, result.get(i).size()); - assertEquals("aMessage", result.get(i).getString(1, "default value")); - } - } - - private static Properties getDefaultKafkaProperties() { - Properties p = new Properties(); - p.put("zookeeper.connect", ""); - p.put("group.id", ""); - - return p; - } /** * @param rowCount The number of rows that should be returned * @param hasFields Whether a "UUID" field should be added to each row * @return A RowMetaAndData object that can be used for input data in a test transformation */ - private static List generateInputData(int rowCount, boolean hasFields) { + private static List generateInputData(int rowCount, boolean hasFields) + { List retval = new ArrayList(); RowMetaInterface rowMeta = new RowMeta(); if (hasFields) { @@ -217,11 +170,15 @@ private static List generateInputData(int rowCount, boolean hasF return retval; } - private static MessageAndMetadata generateKafkaMessage() { - byte[] message = "aMessage".getBytes(); - - return new MessageAndMetadata("topic", 0, new Message(message), - 0, new DefaultDecoder(null), new DefaultDecoder(null)); + private ConsumerRecords generateRecords() + { + Map>> recordsHM = new HashMap<>(); + ConsumerRecord record = new ConsumerRecord(STEP_TOPIC, 0, 0, "key", "value"); + TopicPartition tp = new TopicPartition(STEP_TOPIC, 0); + List> list = new ArrayList<>(); + list.add(record); + recordsHM.put(tp, list); + return new ConsumerRecords<>(recordsHM); } -} +} \ No newline at end of file From f080752ae248aba67b156fe7a9686e46d6b3b725 Mon Sep 17 00:00:00 2001 From: Miguel Angel Garcia <92518@globalia.com> Date: Mon, 25 May 2020 12:47:43 +0200 Subject: [PATCH 2/3] Version Release 2.0.0 --- pom.xml | 4 ++-- src/main/resources/version.xml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 3559e31..1335bf7 100644 --- a/pom.xml +++ b/pom.xml @@ -1,10 +1,10 @@ - 4.0.0 + com.ruckuswireless pentaho-kafka-consumer - 2.0.0-SNAPSHOT + 2.0.0 Apache Kafka Consumer Plug-In for Pentaho diff --git a/src/main/resources/version.xml b/src/main/resources/version.xml index 036224f..2aa6441 100644 --- a/src/main/resources/version.xml +++ b/src/main/resources/version.xml @@ -1 +1 @@ -TRUNK-SNAPSHOT +${pom.version} From 0aec828f194f4ef8df605b0d42525a8a498e88f1 Mon Sep 17 00:00:00 2001 From: Miguel Angel Garcia Reguera Date: Mon, 30 Nov 2020 20:20:52 +0100 Subject: [PATCH 3/3] Restore file for CI build --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..6a2ce34 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,3 @@ +language: java +notifications: + email: false