diff --git a/.classpath b/.classpath
deleted file mode 100644
index 6a6c8e3..0000000
--- a/.classpath
+++ /dev/null
@@ -1,31 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/.dir-locals.el b/.dir-locals.el
deleted file mode 100644
index 15aae5e..0000000
--- a/.dir-locals.el
+++ /dev/null
@@ -1,4 +0,0 @@
-((nil . ((indent-tabs-mode . t)
- (tab-width . 4)
- (c-basic-offset . 4)
- (fill-column . 80))))
diff --git a/.project b/.project
deleted file mode 100644
index 904397b..0000000
--- a/.project
+++ /dev/null
@@ -1,23 +0,0 @@
-
-
- pentaho-kafka-consumer
-
-
-
-
-
- org.eclipse.jdt.core.javabuilder
-
-
-
-
- org.eclipse.m2e.core.maven2Builder
-
-
-
-
-
- org.eclipse.jdt.core.javanature
- org.eclipse.m2e.core.maven2Nature
-
-
diff --git a/README.md b/README.md
index b080c83..9fc1c34 100644
--- a/README.md
+++ b/README.md
@@ -13,7 +13,7 @@ Apache Kafka consumer step plug-in for Pentaho Kettle.
### Apache Kafka Compatibility ###
-The consumer depends on Apache Kafka 0.8.1.1, which means that the broker must be of 0.8.x version or later.
+The consumer depends on Apache Kafka 1.1.0. Check broker version to ensure compatibility.
If you want to build the plugin for a different Kafka version you have to
modify the values of kafka.version and kafka.scala.version in the properties
diff --git a/pom.xml b/pom.xml
index 9403558..1335bf7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,37 +1,48 @@
-
4.0.0
+
com.ruckuswireless
pentaho-kafka-consumer
- TRUNK-SNAPSHOT
+ 2.0.0
Apache Kafka Consumer Plug-In for Pentaho
UTF-8
- 1.6
- 1.6
- 7.1.0.0-12
- 2.10
- 0.8.2.1
- ${maven.build.timestamp}
+ 1.8
+ 1.8
+ 9.0.0.0-423
+ 2.12
+ 1.1.0
4.12
1.6.6
+ ${maven.build.timestamp}
yyyyMMdd-HHmm
pentaho-releases
- http://nexus.pentaho.org/content/groups/omni
+ https://nexus.pentaho.org/repository/omni/
+
+
+ maven-eclipse-repo
+ http://maven-eclipse.github.io/maven
-
- maven-eclipse-repo
- http://maven-eclipse.github.io/maven
-
+
+ org.slf4j
+ slf4j-api
+ 1.7.25
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.25
+
+
org.apache.kafka
kafka_${kafka.scala.version}
@@ -51,6 +62,7 @@
+
pentaho-kettle
kettle-core
@@ -69,10 +81,11 @@
${kettle.version}
provided
+
pentaho-kettle
kettle-engine-test
- ${kettle.version}
+ 7.1.0.32-246
test
@@ -97,7 +110,9 @@
+
${project.artifactId}
+
src/main/resources
@@ -116,17 +131,7 @@
-
- org.pitest
- pitest-maven
- 1.2.3
-
-
- XML
- HTML
-
-
-
+
maven-dependency-plugin
@@ -143,7 +148,6 @@
-
org.apache.maven.plugins
maven-assembly-plugin
@@ -165,6 +169,18 @@
+
+
+ org.pitest
+ pitest-maven
+ 1.2.3
+
+
+ XML
+ HTML
+
+
+
org.sonarsource.scanner.maven
sonar-maven-plugin
diff --git a/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumer.java b/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumer.java
index db46c6e..7350963 100644
--- a/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumer.java
+++ b/src/main/java/org/pentaho/di/trans/kafka/consumer/KafkaConsumer.java
@@ -1,187 +1,209 @@
package org.pentaho.di.trans.kafka.consumer;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.pentaho.di.core.exception.KettleException;
+import org.pentaho.di.core.exception.KettleStepException;
+import org.pentaho.di.core.exception.KettleValueException;
import org.pentaho.di.core.row.RowDataUtil;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
-import org.pentaho.di.trans.step.*;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.pentaho.di.trans.step.BaseStep;
+import org.pentaho.di.trans.step.StepDataInterface;
+import org.pentaho.di.trans.step.StepInterface;
+import org.pentaho.di.trans.step.StepMeta;
+import org.pentaho.di.trans.step.StepMetaInterface;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
import java.util.Map.Entry;
import java.util.Properties;
-import java.util.concurrent.*;
/**
* Kafka Consumer step processor
*
* @author Michael Spector
+ * @author Miguel Ángel García
*/
-public class KafkaConsumer extends BaseStep implements StepInterface {
- public static final String CONSUMER_TIMEOUT_KEY = "consumer.timeout.ms";
+public class KafkaConsumer extends BaseStep implements StepInterface
+{
+ private KafkaConsumerMeta meta;
+ private KafkaConsumerData data;
+
- public KafkaConsumer(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
- Trans trans) {
+ /**
+ * Default step constructor.
+ */
+ public KafkaConsumer(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, Trans trans)
+ {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}
- public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
- super.init(smi, sdi);
- KafkaConsumerMeta meta = (KafkaConsumerMeta) smi;
- KafkaConsumerData data = (KafkaConsumerData) sdi;
-
- Properties properties = meta.getKafkaProperties();
- Properties substProperties = new Properties();
- for (Entry