diff --git a/pom.xml b/pom.xml index 668e0ed..b508a8a 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.github.jcustenborder.kafka.connect kafka-connect-parent - 2.0.0-cp1 + 2.1.1-cp1 kafka-connect-transform-common 0.1.0-SNAPSHOT diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/Compress.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/Compress.java new file mode 100644 index 0000000..6f27638 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/Compress.java @@ -0,0 +1,81 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation; +import com.google.common.io.ByteStreams; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Base64; +import java.util.Map; + +public abstract class Compress> extends BaseKeyValueTransformation { + public Compress(boolean isKey) { + super(isKey); + } + + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + + } + + protected abstract OutputStream createStream(OutputStream input) throws IOException; + + + @Override + public void configure(Map map) { + + } + + @Override + protected SchemaAndValue processString(R record, Schema inputSchema, String base64Input) { + byte[] input = Base64.getDecoder().decode(base64Input); + Schema bytesSchema = inputSchema.isOptional() ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA; + SchemaAndValue compressed = processBytes(record, bytesSchema, input); + String result = Base64.getEncoder().encodeToString((byte[]) compressed.value()); + return new SchemaAndValue(inputSchema, result); + } + + @Override + protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) { + try (InputStream inputStream = new ByteArrayInputStream(input)) { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + try (OutputStream compressStream = createStream(outputStream)) { + ByteStreams.copy(inputStream, compressStream); + compressStream.flush(); + return new SchemaAndValue(inputSchema, outputStream.toByteArray()); + } + } + } catch (IOException ex) { + throw new DataException(ex); + } + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/Decompress.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/Decompress.java new file mode 100644 index 0000000..0c16037 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/Decompress.java @@ -0,0 +1,79 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation; +import com.google.common.io.ByteStreams; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Base64; +import java.util.Map; + +public abstract class Decompress> extends BaseKeyValueTransformation { + public Decompress(boolean isKey) { + super(isKey); + } + + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + + } + + protected abstract InputStream createStream(InputStream input) throws IOException; + + + @Override + public void configure(Map map) { + + } + + @Override + protected SchemaAndValue processString(R record, Schema inputSchema, String base64Input) { + byte[] input = Base64.getDecoder().decode(base64Input); + Schema bytesSchema = inputSchema.isOptional() ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA; + SchemaAndValue compressed = processBytes(record, bytesSchema, input); + String result = Base64.getEncoder().encodeToString((byte[]) compressed.value()); + return new SchemaAndValue(inputSchema, result); + } + + @Override + protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) { + try (InputStream inputStream = new ByteArrayInputStream(input)) { + try (InputStream decompressStream = createStream(inputStream)) { + try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { + ByteStreams.copy(decompressStream, outputStream); + return new SchemaAndValue(inputSchema, outputStream.toByteArray()); + } + } + } catch (IOException ex) { + throw new DataException(ex); + } + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ExtractTimestamp.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ExtractTimestamp.java index 41c3c98..c46870e 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ExtractTimestamp.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/ExtractTimestamp.java @@ -33,6 +33,8 @@ import java.util.Date; import java.util.Map; +@Title("ExtractTimestamp") +@Description("This transformation is used to use a field from the input data to override the timestamp for the record.") public abstract class ExtractTimestamp> implements Transformation { private static final Logger log = LoggerFactory.getLogger(ExtractTimestamp.class); public ExtractTimestampConfig config; @@ -127,8 +129,23 @@ public void configure(Map settings) { } - @Title("ExtractTimestamp(Value)") - @Description("This transformation is used to use a field from the input data to override the timestamp for the record.") + public static class Key> extends ExtractTimestamp { + + @Override + public R apply(R r) { + final long timestamp = process(new SchemaAndValue(r.valueSchema(), r.value())); + return r.newRecord( + r.topic(), + r.kafkaPartition(), + r.keySchema(), + r.key(), + r.valueSchema(), + r.value(), + timestamp + ); + } + } + public static class Value> extends ExtractTimestamp { @Override diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/GzipCompress.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/GzipCompress.java new file mode 100644 index 0000000..a2aca48 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/GzipCompress.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import org.apache.kafka.connect.connector.ConnectRecord; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.GZIPOutputStream; + +public abstract class GzipCompress> extends Compress { + public GzipCompress(boolean isKey) { + super(isKey); + } + + @Override + protected OutputStream createStream(OutputStream input) throws IOException { + return new GZIPOutputStream(input); + } + + public static class Key> extends GzipCompress { + public Key() { + super(true); + } + } + + public static class Value> extends GzipCompress { + public Value() { + super(false); + } + } + +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/GzipDecompress.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/GzipDecompress.java new file mode 100644 index 0000000..cccb399 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/GzipDecompress.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import org.apache.kafka.connect.connector.ConnectRecord; + +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; + +public abstract class GzipDecompress> extends Decompress { + public GzipDecompress(boolean isKey) { + super(isKey); + } + + @Override + protected InputStream createStream(InputStream input) throws IOException { + return new GZIPInputStream(input); + } + + public static class Key> extends GzipDecompress { + public Key() { + super(true); + } + } + + public static class Value> extends GzipDecompress { + public Value() { + super(false); + } + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/SetMaximumPrecision.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/SetMaximumPrecision.java new file mode 100644 index 0000000..56a4ca4 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/SetMaximumPrecision.java @@ -0,0 +1,173 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.DocumentationNote; +import com.github.jcustenborder.kafka.connect.utils.config.Title; +import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Title("SetMaximumPrecision") +@Description("This transformation is used to ensure that all decimal fields in a struct are below the " + + "maximum precision specified.") +@DocumentationNote("The Confluent AvroConverter uses a default precision of 64 which can be too large " + + "for some database systems.") +public class SetMaximumPrecision> extends BaseKeyValueTransformation { + private static final Logger log = LoggerFactory.getLogger(SetMaximumPrecision.class); + + public SetMaximumPrecision(boolean isKey) { + super(isKey); + } + + @Override + public ConfigDef config() { + return SetMaximumPrecisionConfig.config(); + } + + @Override + public void close() { + + } + + SetMaximumPrecisionConfig config; + + static final State NOOP = new State(true, null, null); + + static class State { + public final boolean noop; + public final Schema outputSchema; + public final Set decimalFields; + + State(boolean noop, Schema outputSchema, Set decimalFields) { + this.noop = noop; + this.outputSchema = outputSchema; + this.decimalFields = decimalFields; + } + } + + Map schemaLookup = new HashMap<>(); + + static final String CONNECT_AVRO_DECIMAL_PRECISION_PROP = "connect.decimal.precision"; + + State state(Schema inputSchema) { + return this.schemaLookup.computeIfAbsent(inputSchema, new Function() { + @Override + public State apply(Schema schema) { + Set decimalFields = inputSchema.fields().stream() + .filter(f -> Decimal.LOGICAL_NAME.equals(f.schema().name())) + .filter(f -> Integer.parseInt(f.schema().parameters().getOrDefault(CONNECT_AVRO_DECIMAL_PRECISION_PROP, "64")) > config.maxPrecision) + .map(Field::name) + .collect(Collectors.toSet()); + State result; + + if (decimalFields.size() == 0) { + result = NOOP; + } else { + log.trace("state() - processing schema '{}'", schema.name()); + SchemaBuilder builder = SchemaBuilder.struct() + .name(inputSchema.name()) + .doc(inputSchema.doc()) + .version(inputSchema.version()); + if (null != inputSchema.parameters() && !inputSchema.parameters().isEmpty()) { + builder.parameters(inputSchema.parameters()); + } + + for (Field field : inputSchema.fields()) { + log.trace("state() - processing field '{}'", field.name()); + if (decimalFields.contains(field.name())) { + Map parameters = new LinkedHashMap<>(); + if (null != field.schema().parameters() && !field.schema().parameters().isEmpty()) { + parameters.putAll(field.schema().parameters()); + } + parameters.put(CONNECT_AVRO_DECIMAL_PRECISION_PROP, Integer.toString(config.maxPrecision)); + int scale = Integer.parseInt(parameters.get(Decimal.SCALE_FIELD)); + SchemaBuilder fieldBuilder = Decimal.builder(scale) + .parameters(parameters) + .doc(field.schema().doc()) + .version(field.schema().version()); + if (field.schema().isOptional()) { + fieldBuilder.optional(); + } + Schema fieldSchema = fieldBuilder.build(); + builder.field(field.name(), fieldSchema); + } else { + log.trace("state() - copying field '{}' to new schema.", field.name()); + builder.field(field.name(), field.schema()); + } + } + + Schema outputSchema = builder.build(); + result = new State(false, outputSchema, decimalFields); + } + + + return result; + } + }); + + } + + @Override + protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) { + State state = state(inputSchema); + SchemaAndValue result; + + if (state.noop) { + result = new SchemaAndValue(inputSchema, input); + } else { + Struct struct = new Struct(state.outputSchema); + for (Field field : inputSchema.fields()) { + struct.put(field.name(), input.get(field.name())); + } + result = new SchemaAndValue(state.outputSchema, struct); + } + return result; + } + + @Override + public void configure(Map settings) { + this.config = new SetMaximumPrecisionConfig(settings); + } + + public static class Key> extends SetMaximumPrecision { + public Key() { + super(true); + } + } + + public static class Value> extends SetMaximumPrecision { + public Value() { + super(false); + } + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/SetMaximumPrecisionConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/SetMaximumPrecisionConfig.java new file mode 100644 index 0000000..3b78f2d --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/SetMaximumPrecisionConfig.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.util.Map; + +public class SetMaximumPrecisionConfig extends AbstractConfig { + public final int maxPrecision; + + public SetMaximumPrecisionConfig(Map originals) { + super(config(), originals); + this.maxPrecision = getInt(MAX_PRECISION_CONFIG); + } + + public static final String MAX_PRECISION_CONFIG = "precision.max"; + static final String MAX_PRECISION_DOC = "The maximum precision allowed."; + + public static ConfigDef config() { + return new ConfigDef() + .define( + ConfigKeyBuilder.of(MAX_PRECISION_CONFIG, ConfigDef.Type.INT) + .documentation(MAX_PRECISION_DOC) + .importance(ConfigDef.Importance.HIGH) + .validator(ConfigDef.Range.between(1, 64)) + .build() + ); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/package-info.java b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/package-info.java index fcb3fa9..3cc047a 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/package-info.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/package-info.java @@ -16,7 +16,11 @@ @Introduction("\n" + "This project contains common transformations for every day use cases with Kafka Connect.") @Title("Common Transformations") +@PluginName("kafka-connect-transform-common") +@PluginOwner("jcustenborder") package com.github.jcustenborder.kafka.connect.transform.common; import com.github.jcustenborder.kafka.connect.utils.config.Introduction; -import com.github.jcustenborder.kafka.connect.utils.config.Title; \ No newline at end of file +import com.github.jcustenborder.kafka.connect.utils.config.Title; +import com.github.jcustenborder.kafka.connect.utils.config.PluginName; +import com.github.jcustenborder.kafka.connect.utils.config.PluginOwner; \ No newline at end of file diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/DocumentationTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/DocumentationTest.java index a1264e7..c57455e 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/DocumentationTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/DocumentationTest.java @@ -18,8 +18,6 @@ import com.github.jcustenborder.kafka.connect.utils.BaseDocumentationTest; public class DocumentationTest extends BaseDocumentationTest { - @Override - protected String[] packages() { - return new String[]{this.getClass().getPackage().getName()}; - } + + } diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/SetMaximumPrecisionTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/SetMaximumPrecisionTest.java new file mode 100644 index 0000000..a8546c5 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/transform/common/SetMaximumPrecisionTest.java @@ -0,0 +1,105 @@ +package com.github.jcustenborder.kafka.connect.transform.common; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class SetMaximumPrecisionTest { + SinkRecord record(Struct struct) { + return new SinkRecord("test", 1, null, null, struct.schema(), struct, 1234L); + } + + @Test + public void noop() { + Schema schema = SchemaBuilder.struct() + .field("first", Schema.STRING_SCHEMA) + .field("last", Schema.STRING_SCHEMA) + .field("email", Schema.STRING_SCHEMA) + .build(); + Struct struct = new Struct(schema) + .put("first", "test") + .put("last", "user") + .put("first", "none@none.com"); + SinkRecord record = record(struct); + SetMaximumPrecision.Value transform = new SetMaximumPrecision.Value<>(); + transform.configure( + ImmutableMap.of(SetMaximumPrecisionConfig.MAX_PRECISION_CONFIG, 32) + ); + SinkRecord actual = transform.apply(record); + assertNotNull(actual); + assertStruct((Struct) record.value(), (Struct) actual.value()); + } + + @Test + public void convert() { + final Schema inputSchema = SchemaBuilder.struct() + .field("first", Decimal.schema(5)) + .field( + "second", + Decimal.builder(5) + .parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "16") + .optional() + .build() + ) + .field( + "third", + Decimal.builder(5) + .parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "48") + .optional() + .build() + ) + .build(); + final Struct inputStruct = new Struct(inputSchema) + .put("first", BigDecimal.ONE) + .put("second", null) + .put("third", BigDecimal.ONE); + final Schema expectedSchema = SchemaBuilder.struct() + .field( + "first", + Decimal.builder(5) + .parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "32") + .build() + ) + .field( + "second", + Decimal.builder(5) + .parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "16") + .optional() + .build() + ) + .field( + "third", + Decimal.builder(5) + .parameter(SetMaximumPrecision.CONNECT_AVRO_DECIMAL_PRECISION_PROP, "32") + .optional() + .build() + ) + .build(); + final Struct expectedStruct = new Struct(expectedSchema) + .put("first", BigDecimal.ONE) + .put("second", null) + .put("third", BigDecimal.ONE); + + + SinkRecord record = record(inputStruct); + SetMaximumPrecision.Value transform = new SetMaximumPrecision.Value<>(); + transform.configure( + ImmutableMap.of(SetMaximumPrecisionConfig.MAX_PRECISION_CONFIG, 32) + ); + + + SinkRecord actual = transform.apply(record); + assertNotNull(actual); + assertStruct(expectedStruct, (Struct) actual.value()); + } + +}