diff --git a/src/main/java/io/aiven/kafka/connect/debezium/converters/MoneyConverter.java b/src/main/java/io/aiven/kafka/connect/debezium/converters/MoneyConverter.java index efa2e10..aa5b7f1 100644 --- a/src/main/java/io/aiven/kafka/connect/debezium/converters/MoneyConverter.java +++ b/src/main/java/io/aiven/kafka/connect/debezium/converters/MoneyConverter.java @@ -17,6 +17,7 @@ package io.aiven.kafka.connect.debezium.converters; import java.math.BigDecimal; +import java.util.Locale; import java.util.Properties; import org.apache.kafka.connect.data.SchemaBuilder; @@ -51,9 +52,9 @@ public void converterFor(final RelationalColumn column, } if (data instanceof BigDecimal) { // Expected type - return String.format("%.2f", data); + return String.format(Locale.ROOT, "%.2f", data); } else if (data instanceof Number) { - return String.format("%.2f", ((Number) data).floatValue()); + return String.format(Locale.ROOT, "%.2f", ((Number) data).floatValue()); } else { throw new IllegalArgumentException("Money type should have BigDecimal type"); } diff --git a/src/main/java/io/aiven/kafka/connect/transforms/ConcatFields.java b/src/main/java/io/aiven/kafka/connect/transforms/ConcatFields.java index 23d23c3..8c32fbc 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/ConcatFields.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/ConcatFields.java @@ -77,13 +77,10 @@ public R apply(final R record) { struct.schema().fields().forEach(field -> { newStruct.put(field.name(), struct.get(field)); }); - config.fieldNames().forEach(field -> { + config.fields().forEach(field -> { try { - if (struct.get(field) == null) { - outputValue.add(config.fieldReplaceMissing()); - } else { - outputValue.add(struct.get(field).toString()); - } + String value = field.readAsString(struct).orElse(config.fieldReplaceMissing()); + outputValue.add(value); } catch (final DataException e) { log.debug("{} is missing, concat will use {}", field, config.fieldReplaceMissing()); outputValue.add(config.fieldReplaceMissing()); @@ -94,12 +91,9 @@ public R apply(final R record) { } else if (schemaAndValue.value() instanceof Map) { final Map newValue = new HashMap<>((Map) schemaAndValue.value()); final StringJoiner outputValue = new StringJoiner(config.delimiter()); - config.fieldNames().forEach(field -> { - if (newValue.get(field) == null) { - outputValue.add(config.fieldReplaceMissing()); - } else { - outputValue.add(newValue.get(field).toString()); - } + config.fields().forEach(field -> { + String value = field.readAsString(newValue).orElse(config.fieldReplaceMissing()); + outputValue.add(value); }); newValue.put(config.outputFieldName(), outputValue.toString()); diff --git a/src/main/java/io/aiven/kafka/connect/transforms/ConcatFieldsConfig.java b/src/main/java/io/aiven/kafka/connect/transforms/ConcatFieldsConfig.java index b647be7..5142019 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/ConcatFieldsConfig.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/ConcatFieldsConfig.java @@ -18,10 +18,14 @@ import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import io.aiven.kafka.connect.transforms.utils.CursorField; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import static java.util.stream.Collectors.toList; + final class ConcatFieldsConfig extends AbstractConfig { public static final String FIELD_NAMES_CONFIG = "field.names"; private static final String FIELD_NAMES_DOC = @@ -69,8 +73,9 @@ static ConfigDef config() { DELIMITER_DOC); } - final List fieldNames() { - return getList(FIELD_NAMES_CONFIG); + final List fields() { + return getList(FIELD_NAMES_CONFIG).stream().map(CursorField::new) + .collect(toList()); } final String outputFieldName() { diff --git a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestamp.java b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestamp.java index a45ac0a..035549e 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestamp.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestamp.java @@ -18,8 +18,10 @@ import java.util.Date; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import io.aiven.kafka.connect.transforms.utils.CursorField; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.SchemaAndValue; @@ -44,31 +46,33 @@ public void configure(final Map configs) { @Override public R apply(final R record) { final SchemaAndValue schemaAndValue = getSchemaAndValue(record); - + String fieldName = config.field().getCursor(); if (schemaAndValue.value() == null) { throw new DataException(keyOrValue() + " can't be null: " + record); } - final Object fieldValue; + final Optional fieldValueOpt; if (schemaAndValue.value() instanceof Struct) { final Struct struct = (Struct) schemaAndValue.value(); - if (struct.schema().field(config.fieldName()) == null) { - throw new DataException(config.fieldName() + " field must be present and its value can't be null: " + if (config.field().read(struct.schema()) == null) { + throw new DataException(fieldName + " field must be present and its value can't be null: " + record); } - fieldValue = struct.get(config.fieldName()); + fieldValueOpt = config.field().read(struct); } else if (schemaAndValue.value() instanceof Map) { final Map map = (Map) schemaAndValue.value(); - fieldValue = map.get(config.fieldName()); + fieldValueOpt = config.field().read(map); } else { throw new DataException(keyOrValue() + " type must be STRUCT or MAP: " + record); } - if (fieldValue == null) { - throw new DataException(config.fieldName() + " field must be present and its value can't be null: " + if (fieldValueOpt.isEmpty()) { + throw new DataException(fieldName + " field must be present and its value can't be null: " + record); } + Object fieldValue = fieldValueOpt.orElse(null); + final long newTimestamp; if (fieldValue instanceof Long) { final var longFieldValue = (long) fieldValue; @@ -81,7 +85,7 @@ public R apply(final R record) { final var dateFieldValue = (Date) fieldValue; newTimestamp = dateFieldValue.getTime(); } else { - throw new DataException(config.fieldName() + throw new DataException(fieldName + " field must be INT64 or org.apache.kafka.connect.data.Timestamp: " + record); } diff --git a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfig.java b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfig.java index e859798..5a78179 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfig.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfig.java @@ -18,8 +18,10 @@ import java.util.Arrays; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; +import io.aiven.kafka.connect.transforms.utils.CursorField; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -100,8 +102,8 @@ public void ensureValid(final String name, final Object value) { EPOCH_RESOLUTION_DOC); } - final String fieldName() { - return getString(FIELD_NAME_CONFIG); + final CursorField field() { + return new CursorField(getString(FIELD_NAME_CONFIG)); } final TimestampResolution timestampResolution() { diff --git a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopic.java b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopic.java index d2f9c71..168742f 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopic.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopic.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Optional; +import io.aiven.kafka.connect.transforms.utils.CursorField; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Field; @@ -73,17 +74,17 @@ public R apply(final R record) { final Optional newTopic; if (schemaAndValue.schema() == null) { // schemaless values (Map) - if (config.fieldName().isPresent()) { + if (config.field().isPresent()) { newTopic = topicNameFromNamedFieldSchemaless( - record.toString(), schemaAndValue.value(), config.fieldName().get()); + record.toString(), schemaAndValue.value(), config.field().get()); } else { newTopic = topicNameWithoutFieldNameSchemaless( record.toString(), schemaAndValue.value()); } } else { // schema-based values (Struct) - if (config.fieldName().isPresent()) { + if (config.field().isPresent()) { newTopic = topicNameFromNamedFieldWithSchema( - record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get()); + record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.field().get()); } else { newTopic = topicNameWithoutFieldNameWithSchema( record.toString(), schemaAndValue.schema(), schemaAndValue.value()); @@ -91,8 +92,10 @@ public R apply(final R record) { } if (newTopic.isPresent()) { + String appended = record.topic() + config.appendDelimiter() + newTopic.get(); + String newName = config.appendToExisting() ? appended : newTopic.get(); return record.newRecord( - newTopic.get(), + newName, record.kafkaPartition(), record.keySchema(), record.key(), @@ -112,7 +115,7 @@ public R apply(final R record) { private Optional topicNameFromNamedFieldSchemaless(final String recordStr, final Object value, - final String fieldName) { + final CursorField field) { if (value == null) { throw new DataException(dataPlace() + " can't be null if field name is specified: " + recordStr); } @@ -123,15 +126,15 @@ private Optional topicNameFromNamedFieldSchemaless(final String recordSt @SuppressWarnings("unchecked") final Map valueMap = (Map) value; - final Optional result = Optional.ofNullable(valueMap.get(fieldName)) - .map(field -> { - if (!SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM.contains(field.getClass())) { - throw new DataException(fieldName + " type in " + dataPlace() + final Optional result = field.read(valueMap) + .map(fieldValue -> { + if (!SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM.contains(fieldValue.getClass())) { + throw new DataException(field.getCursor() + " type in " + dataPlace() + " " + value + " must be " + SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM + ": " + recordStr); } - return field; + return fieldValue; }) .map(Object::toString); @@ -141,7 +144,7 @@ private Optional topicNameFromNamedFieldSchemaless(final String recordSt if (config.skipMissingOrNull()) { return Optional.empty(); } else { - throw new DataException(fieldName + " in " + dataPlace() + " can't be null or empty: " + recordStr); + throw new DataException(field.getCursor() + " in " + dataPlace() + " can't be null or empty: " + recordStr); } } } @@ -169,7 +172,7 @@ private Optional topicNameWithoutFieldNameSchemaless(final String record private Optional topicNameFromNamedFieldWithSchema(final String recordStr, final Schema schema, final Object value, - final String fieldName) { + final CursorField field) { if (Schema.Type.STRUCT != schema.type()) { throw new DataException(dataPlace() + " schema type must be STRUCT if field name is specified: " + recordStr); @@ -179,32 +182,31 @@ private Optional topicNameFromNamedFieldWithSchema(final String recordSt throw new DataException(dataPlace() + " can't be null if field name is specified: " + recordStr); } - final Field field = schema.field(fieldName); - if (field == null) { + final Field fieldSchema = field.read(schema); + if (fieldSchema == null) { if (config.skipMissingOrNull()) { return Optional.empty(); } else { - throw new DataException(fieldName + " in " + dataPlace() + " schema can't be missing: " + recordStr); + throw new DataException(field.getCursor() + " in " + dataPlace() + " schema can't be missing: " + recordStr); } } - if (!SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM.contains(field.schema().type())) { - throw new DataException(fieldName + " schema type in " + dataPlace() + if (!SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM.contains(fieldSchema.schema().type())) { + throw new DataException(field.getCursor() + " schema type in " + dataPlace() + " must be " + SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM + ": " + recordStr); } final Struct struct = (Struct) value; - final Optional result = Optional.ofNullable(struct.get(fieldName)) - .map(Object::toString); + final Optional result = field.readAsString(struct); if (result.isPresent() && !result.get().equals("")) { return result; } else { if (config.skipMissingOrNull()) { return Optional.empty(); } else { - throw new DataException(fieldName + " in " + dataPlace() + " can't be null or empty: " + recordStr); + throw new DataException(field.getCursor() + " in " + dataPlace() + " can't be null or empty: " + recordStr); } } } diff --git a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopicConfig.java b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopicConfig.java index 795dc5b..1dd40ef 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopicConfig.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/ExtractTopicConfig.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Optional; +import io.aiven.kafka.connect.transforms.utils.CursorField; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -33,6 +34,14 @@ class ExtractTopicConfig extends AbstractConfig { "In case the source of the new topic name is null or missing, " + "should a record be silently passed without transformation."; + public static final String APPEND_TO_ORIGINAL_TOPIC_NAME_CONFIG = "append.to.topic"; + private static final String APPEND_TO_ORIGINAL_TOPIC_NAME_DOC = + "Appends the selected value to the existing topic name to derive the new topic name."; + + public static final String APPEND_DELIMITER_CONFIG = "append.to.topic.delimiter"; + private static final String APPEND_DELIMITER_DOC = + "Appends the selected value with the given delimiter to the existing topic name."; + ExtractTopicConfig(final Map originals) { super(config(), originals); } @@ -50,18 +59,38 @@ static ConfigDef config() { ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, - SKIP_MISSING_OR_NULL_DOC); + SKIP_MISSING_OR_NULL_DOC) + .define( + APPEND_TO_ORIGINAL_TOPIC_NAME_CONFIG, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + APPEND_TO_ORIGINAL_TOPIC_NAME_DOC) + .define( + APPEND_DELIMITER_CONFIG, + ConfigDef.Type.STRING, + "-", + ConfigDef.Importance.LOW, + APPEND_DELIMITER_DOC); } - Optional fieldName() { + Optional field() { final String rawFieldName = getString(FIELD_NAME_CONFIG); if (null == rawFieldName || "".equals(rawFieldName)) { return Optional.empty(); } - return Optional.of(rawFieldName); + return Optional.of(new CursorField(rawFieldName)); } boolean skipMissingOrNull() { return getBoolean(SKIP_MISSING_OR_NULL_CONFIG); } + + boolean appendToExisting() { + return getBoolean(APPEND_TO_ORIGINAL_TOPIC_NAME_CONFIG); + } + + String appendDelimiter() { + return getString(APPEND_DELIMITER_CONFIG); + } } diff --git a/src/main/java/io/aiven/kafka/connect/transforms/FilterByFieldValue.java b/src/main/java/io/aiven/kafka/connect/transforms/FilterByFieldValue.java index c61cfae..0dd310f 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/FilterByFieldValue.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/FilterByFieldValue.java @@ -21,6 +21,8 @@ import java.util.function.Predicate; import java.util.regex.Pattern; +import io.aiven.kafka.connect.transforms.utils.CursorField; + import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -32,9 +34,11 @@ import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.transforms.Transformation; +import static java.util.function.Predicate.not; + public abstract class FilterByFieldValue> implements Transformation { - private String fieldName; + private Optional field; private Optional fieldExpectedValue; private Optional fieldValuePattern; @@ -68,7 +72,9 @@ public ConfigDef config() { @Override public void configure(final Map configs) { final AbstractConfig config = new AbstractConfig(config(), configs); - this.fieldName = config.getString("field.name"); + this.field = Optional.ofNullable(config.getString("field.name")) + .filter(not(String::isEmpty)) + .map(CursorField::new); this.fieldExpectedValue = Optional.ofNullable(config.getString("field.value")); this.fieldValuePattern = Optional.ofNullable(config.getString("field.value.pattern")); final boolean expectedValuePresent = fieldExpectedValue.isPresent(); @@ -116,29 +122,26 @@ public R apply(final R record) { private R applyWithSchema(final R record) { final Struct struct = (Struct) operatingValue(record); - final SchemaAndValue schemaAndValue = getStructFieldValue(struct, fieldName).orElse(null); + final SchemaAndValue schemaAndValue = field.flatMap(f -> getStructFieldValue(struct, f)).orElse(null); return filterCondition.test(schemaAndValue) ? record : null; } - private Optional getStructFieldValue(final Struct struct, final String fieldName) { - final Schema schema = struct.schema(); - final Field field = schema.field(fieldName); - final Object fieldValue = struct.get(field); - if (fieldValue == null) { - return Optional.empty(); - } else { - return Optional.of(new SchemaAndValue(field.schema(), struct.get(field))); - } + private Optional getStructFieldValue(final Struct struct, final CursorField field) { + Optional value = field.read(struct); + Field fieldSchema = field.read(struct.schema()); + + return value.map(v -> new SchemaAndValue(fieldSchema.schema(), v)); } @SuppressWarnings("unchecked") private R applySchemaless(final R record) { - if (fieldName == null || fieldName.isEmpty()) { + if (field.isEmpty()) { final SchemaAndValue schemaAndValue = getSchemalessFieldValue(operatingValue(record)).orElse(null); return filterCondition.test(schemaAndValue) ? record : null; } else { final Map map = (Map) operatingValue(record); - final SchemaAndValue schemaAndValue = getSchemalessFieldValue(map.get(fieldName)).orElse(null); + Object value = field.get().read(map).orElse(null); + final SchemaAndValue schemaAndValue = getSchemalessFieldValue(value).orElse(null); return filterCondition.test(schemaAndValue) ? record : null; } } diff --git a/src/main/java/io/aiven/kafka/connect/transforms/Hash.java b/src/main/java/io/aiven/kafka/connect/transforms/Hash.java index d2d7c4a..3e01189 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/Hash.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/Hash.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Optional; +import io.aiven.kafka.connect.transforms.utils.CursorField; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Field; @@ -73,9 +74,9 @@ public void configure(final Map configs) { public final Optional getNewValue(final R record, final SchemaAndValue schemaAndValue) { final Optional newValue; - if (config.fieldName().isPresent()) { + if (config.field().isPresent()) { newValue = getNewValueForNamedField( - record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get()); + record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.field().get()); } else { newValue = getNewValueWithoutFieldName( record.toString(), schemaAndValue.schema(), schemaAndValue.value()); @@ -94,7 +95,7 @@ public void close() { private Optional getNewValueForNamedField(final String recordStr, final Schema schema, final Object value, - final String fieldName) { + final CursorField field) { if (Schema.Type.STRUCT != schema.type()) { throw new DataException(dataPlace() + " schema type must be STRUCT if field name is specified: " + recordStr); @@ -104,34 +105,34 @@ private Optional getNewValueForNamedField(final String recordStr, throw new DataException(dataPlace() + " can't be null if field name is specified: " + recordStr); } - final Field field = schema.field(fieldName); - if (field == null) { + final Field fieldSchema = field.read(schema); + if (fieldSchema == null) { if (config.skipMissingOrNull()) { - log.debug(fieldName + " in " + dataPlace() + " schema is missing, skipping transformation"); + log.debug(field.getCursor() + " in " + dataPlace() + " schema is missing, skipping transformation"); return Optional.empty(); } else { - throw new DataException(fieldName + " in " + dataPlace() + " schema can't be missing: " + recordStr); + throw new DataException(field.getCursor() + " in " + dataPlace() + " schema can't be missing: " + recordStr); } } - if (field.schema().type() != Schema.Type.STRING) { - throw new DataException(fieldName + " schema type in " + dataPlace() + if (fieldSchema.schema().type() != Schema.Type.STRING) { + throw new DataException(field.getCursor() + " schema type in " + dataPlace() + " must be " + Schema.Type.STRING + ": " + recordStr); } final Struct struct = (Struct) value; - final String stringValue = struct.getString(fieldName); - if (stringValue == null) { + final Optional stringValue = field.readAsString(struct); + if (stringValue.isEmpty()) { if (config.skipMissingOrNull()) { - log.debug(fieldName + " in " + dataPlace() + " is null, skipping transformation"); + log.debug(field.getCursor() + " in " + dataPlace() + " is null, skipping transformation"); return Optional.empty(); } else { - throw new DataException(fieldName + " in " + dataPlace() + " can't be null: " + recordStr); + throw new DataException(field.getCursor() + " in " + dataPlace() + " can't be null: " + recordStr); } } else { - final String updatedValue = hashString(stringValue); - final Struct updatedRecord = struct.put(fieldName, updatedValue); + final String updatedValue = hashString(stringValue.get()); + final Struct updatedRecord = struct.put(field.getCursor(), updatedValue); return Optional.ofNullable(updatedRecord); } } diff --git a/src/main/java/io/aiven/kafka/connect/transforms/HashConfig.java b/src/main/java/io/aiven/kafka/connect/transforms/HashConfig.java index dbcc16b..aeae929 100644 --- a/src/main/java/io/aiven/kafka/connect/transforms/HashConfig.java +++ b/src/main/java/io/aiven/kafka/connect/transforms/HashConfig.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Optional; +import io.aiven.kafka.connect.transforms.utils.CursorField; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -87,12 +88,12 @@ public static HashFunction fromString(final String value) { } } - Optional fieldName() { + Optional field() { final String rawFieldName = getString(FIELD_NAME_CONFIG); if (null == rawFieldName || "".equals(rawFieldName)) { return Optional.empty(); } - return Optional.of(rawFieldName); + return Optional.of(new CursorField(rawFieldName)); } boolean skipMissingOrNull() { diff --git a/src/main/java/io/aiven/kafka/connect/transforms/utils/CursorField.java b/src/main/java/io/aiven/kafka/connect/transforms/utils/CursorField.java new file mode 100644 index 0000000..acf8dc1 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/transforms/utils/CursorField.java @@ -0,0 +1,101 @@ +package io.aiven.kafka.connect.transforms.utils; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; + +import java.util.Map; +import java.util.Optional; + +public class CursorField { + + private final String cursor; + + public CursorField(String cursor) { + if(cursor == null) { + throw new IllegalArgumentException("provided field must not be null"); + } + this.cursor = cursor; + } + + public String getCursor() { + return cursor; + } + + public Optional read(Map document) { + return read(MAP_NAVIGATION, document, cursor); + } + + public Optional readAsString(Map document) { + return read(document).map(Object::toString); + } + + public Field read(Schema schema) { + return read(SCHEMA_NAVIGATION, schema, cursor).orElse(null); + } + + public Optional read(Struct struct) { + return read(STRUCT_NAVIGATION, struct, cursor); + } + + public Optional readAsString(Struct struct) { + return read(struct).map(Object::toString); + } + + private Optional read(Navigation navAlg, T navigable, String field) { + int firstDot = field.indexOf('.'); + + if(firstDot > 0) { + String head = field.substring(0, firstDot); + String tail = field.substring(firstDot + 1); + return navAlg.diveInto(navigable, head).flatMap(next -> read(navAlg, next, tail)); + } else { + return navAlg.getValue(navigable, field); + } + } + + private interface Navigation { + Optional diveInto(T navigable, String field); + + Optional getValue(T navigable, String field); + } + + private static final Navigation SCHEMA_NAVIGATION = new Navigation<>() { + + @Override + public Optional diveInto(Schema navigable, String field) { + return getValue(navigable, field).map(Field::schema); + } + + @Override + public Optional getValue(Schema navigable, String field) { + return Optional.ofNullable(navigable.field(field)); + } + }; + + private static final Navigation STRUCT_NAVIGATION = new Navigation<>() { + + @Override + public Optional diveInto(Struct navigable, String field) { + return Optional.ofNullable(navigable.getStruct(field)); + } + + @Override + public Optional getValue(Struct navigable, String field) { + return Optional.ofNullable(navigable.get(field)); + } + }; + + private static final Navigation, Object> MAP_NAVIGATION = new Navigation<>() { + @Override + public Optional> diveInto(Map navigable, String field) { + var value = navigable.get(field); + return value instanceof Map ? Optional.of((Map) value) : Optional.empty(); + } + + @Override + public Optional getValue(Map navigable, String field) { + return Optional.ofNullable(navigable.get(field)); + } + }; +} diff --git a/src/test/java/io/aiven/kafka/connect/transforms/ConcatFieldsConfigTest.java b/src/test/java/io/aiven/kafka/connect/transforms/ConcatFieldsConfigTest.java index 9454527..a5ffc3e 100644 --- a/src/test/java/io/aiven/kafka/connect/transforms/ConcatFieldsConfigTest.java +++ b/src/test/java/io/aiven/kafka/connect/transforms/ConcatFieldsConfigTest.java @@ -16,13 +16,14 @@ package io.aiven.kafka.connect.transforms; +import io.aiven.kafka.connect.transforms.utils.CursorField; +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; + import java.util.Arrays; import java.util.HashMap; import java.util.Map; - -import org.apache.kafka.common.config.ConfigException; - -import org.junit.jupiter.api.Test; +import java.util.stream.Collectors; import static io.aiven.kafka.connect.transforms.ConcatFieldsConfig.DELIMITER_CONFIG; import static io.aiven.kafka.connect.transforms.ConcatFieldsConfig.FIELD_NAMES_CONFIG; @@ -36,8 +37,8 @@ class ConcatFieldsConfigTest { void emptyConfig() { final Map props = new HashMap<>(); assertThatThrownBy(() -> new ConcatFieldsConfig(props)) - .isInstanceOf(ConfigException.class) - .hasMessage("Missing required configuration \"field.names\" which has no default value."); + .isInstanceOf(ConfigException.class) + .hasMessage("Missing required configuration \"field.names\" which has no default value."); } @Test @@ -45,8 +46,8 @@ void emptyFieldName() { final Map props = new HashMap<>(); props.put(FIELD_NAMES_CONFIG, ""); assertThatThrownBy(() -> new ConcatFieldsConfig(props)) - .isInstanceOf(ConfigException.class) - .hasMessage("Missing required configuration \"output.field.name\" which has no default value."); + .isInstanceOf(ConfigException.class) + .hasMessage("Missing required configuration \"output.field.name\" which has no default value."); } @Test @@ -57,7 +58,8 @@ void definedFieldName() { props.put(DELIMITER_CONFIG, "-"); props.put(FIELD_REPLACE_MISSING_CONFIG, "*"); final ConcatFieldsConfig config = new ConcatFieldsConfig(props); - assertThat(config.fieldNames()).isEqualTo(Arrays.asList("test", "foo", "bar")); + assertThat(config.fields() + .stream().map(CursorField::getCursor)).containsOnly("test", "foo", "bar"); assertThat(config.outputFieldName()).isEqualTo("combined"); assertThat(config.delimiter()).isEqualTo("-"); assertThat(config.fieldReplaceMissing()).isEqualTo("*"); diff --git a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfigTest.java b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfigTest.java index 8c93a75..4a6a607 100644 --- a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfigTest.java +++ b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTimestampConfigTest.java @@ -49,7 +49,7 @@ void definedFieldName() { final Map props = new HashMap<>(); props.put("field.name", "test"); final ExtractTimestampConfig config = new ExtractTimestampConfig(props); - assertThat(config.fieldName()).isEqualTo("test"); + assertThat(config.field().getCursor()).isEqualTo("test"); } @Test diff --git a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicConfigTest.java b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicConfigTest.java index 66093d9..2e3dcc8 100644 --- a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicConfigTest.java +++ b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicConfigTest.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.Map; +import io.aiven.kafka.connect.transforms.utils.CursorField; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -30,7 +31,7 @@ class ExtractTopicConfigTest { void defaults() { final Map props = new HashMap<>(); final ExtractTopicConfig config = new ExtractTopicConfig(props); - assertThat(config.fieldName()).isNotPresent(); + assertThat(config.field()).isNotPresent(); assertThat(config.skipMissingOrNull()).isFalse(); } @@ -48,7 +49,7 @@ void emptyFieldName() { final Map props = new HashMap<>(); props.put("field.name", ""); final ExtractTopicConfig config = new ExtractTopicConfig(props); - assertThat(config.fieldName()).isNotPresent(); + assertThat(config.field()).isNotPresent(); } @Test @@ -56,6 +57,6 @@ void definedFieldName() { final Map props = new HashMap<>(); props.put("field.name", "test"); final ExtractTopicConfig config = new ExtractTopicConfig(props); - assertThat(config.fieldName()).hasValue("test"); + assertThat(config.field().map(CursorField::getCursor)).hasValue("test"); } } diff --git a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicTest.java b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicTest.java index 163c6a7..4ce7f32 100644 --- a/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicTest.java +++ b/src/test/java/io/aiven/kafka/connect/transforms/ExtractTopicTest.java @@ -31,6 +31,11 @@ import org.junit.jupiter.params.provider.NullAndEmptySource; import org.junit.jupiter.params.provider.ValueSource; +import static io.aiven.kafka.connect.transforms.ExtractTopicConfig.APPEND_DELIMITER_CONFIG; +import static io.aiven.kafka.connect.transforms.ExtractTopicConfig.APPEND_TO_ORIGINAL_TOPIC_NAME_CONFIG; +import static io.aiven.kafka.connect.transforms.ExtractTopicConfig.FIELD_NAME_CONFIG; +import static io.aiven.kafka.connect.transforms.ExtractTopicConfig.SKIP_MISSING_OR_NULL_CONFIG; +import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -75,7 +80,7 @@ void noFieldName_UnsupportedValueType(final boolean skipMissingOrNull) { @ParameterizedTest @NullAndEmptySource void noFieldName_NullOrEmptyValue_NoSkip_WithSchema(final String value) { - final Schema schema = Schema.STRING_SCHEMA; + final Schema schema = STRING_SCHEMA; final SinkRecord originalRecord = record(schema, value); assertThatThrownBy(() -> transformation(null, false).apply(originalRecord)) .isInstanceOf(DataException.class) @@ -94,7 +99,7 @@ void noFieldName_NullOrEmptyValue_NoSkip_Schemaless(final String value) { @ParameterizedTest @NullAndEmptySource void noFieldName_NullOrEmptyValue_Skip_WithSchema(final String value) { - final Schema schema = Schema.STRING_SCHEMA; + final Schema schema = STRING_SCHEMA; final SinkRecord originalRecord = record(schema, value); final SinkRecord result = transformation(null, true).apply(originalRecord); assertThat(result).isEqualTo(originalRecord); @@ -129,7 +134,7 @@ void noFieldName_NormalBooleanValue(final boolean withSchema) { @ParameterizedTest @ValueSource(booleans = {true, false}) void noFieldName_NormalStringValue(final boolean withSchema) { - final Schema schema = withSchema ? Schema.STRING_SCHEMA : null; + final Schema schema = withSchema ? STRING_SCHEMA : null; final SinkRecord originalRecord = record(schema, NEW_TOPIC); final SinkRecord result = transformation(null, false).apply(originalRecord); assertThat(result).isEqualTo(setNewTopic(originalRecord, NEW_TOPIC)); @@ -157,7 +162,7 @@ void fieldName_Schemaless_NonMap(final boolean skipMissingOrNull) { @ValueSource(booleans = {true, false}) void fieldName_WithSchema_NullStruct(final boolean skipMissingOrNull) { final Schema schema = SchemaBuilder.struct() - .field(FIELD, Schema.STRING_SCHEMA) + .field(FIELD, STRING_SCHEMA) .schema(); final SinkRecord originalRecord = record(schema, null); assertThatThrownBy(() -> transformation(FIELD, skipMissingOrNull).apply(originalRecord)) @@ -324,7 +329,7 @@ void fieldName_NormalStringValue(final boolean withSchema) { final SinkRecord originalRecord; if (withSchema) { final Schema schema = SchemaBuilder.struct() - .field(FIELD, Schema.STRING_SCHEMA) + .field(FIELD, STRING_SCHEMA) .schema(); originalRecord = record(schema, new Struct(schema).put(FIELD, NEW_TOPIC)); } else { @@ -335,12 +340,57 @@ void fieldName_NormalStringValue(final boolean withSchema) { assertThat(result).isEqualTo(setNewTopic(originalRecord, NEW_TOPIC)); } + @Test + void fieldName_Nested_Schemaless() { + final Map valueMap = Map.of( + "parent", Map.of( + "child", NEW_TOPIC + ) + ); + + final SinkRecord originalRecord = record(null, valueMap); + final SinkRecord result = transformation("parent.child", false).apply(originalRecord); + assertThat(result).isEqualTo(setNewTopic(originalRecord, NEW_TOPIC)); + } + + @Test + void fieldName_Nested_Schema() { + final Schema innerSchema = SchemaBuilder.struct() + .field("child", STRING_SCHEMA) + .build(); + final Schema schema = SchemaBuilder.struct() + .field("parent", innerSchema) + .schema(); + final SinkRecord originalRecord = record( + schema, new Struct(schema).put("parent", new Struct(innerSchema).put("child", NEW_TOPIC))); + + final SinkRecord result = transformation("parent.child", false).apply(originalRecord); + assertThat(result).isEqualTo(setNewTopic(originalRecord, NEW_TOPIC)); + } + + @Test + void append_Value() { + final Map props = new HashMap<>(); + props.put(FIELD_NAME_CONFIG, FIELD); + props.put(APPEND_TO_ORIGINAL_TOPIC_NAME_CONFIG, Boolean.toString(true)); + props.put(APPEND_DELIMITER_CONFIG, "##"); + final ExtractTopic transform = createTransformationObject(); + transform.configure(props); + + final Map valueMap = new HashMap<>(); + valueMap.put(FIELD, "a"); + + final SinkRecord originalRecord = setNewTopic(record(null, valueMap), "original"); + final SinkRecord result = transform.apply(originalRecord); + assertThat(result).isEqualTo(setNewTopic(originalRecord, "original##a")); + } + private ExtractTopic transformation(final String fieldName, final boolean skipMissingOrNull) { final Map props = new HashMap<>(); if (fieldName != null) { - props.put("field.name", fieldName); + props.put(FIELD_NAME_CONFIG, fieldName); } - props.put("skip.missing.or.null", Boolean.toString(skipMissingOrNull)); + props.put(SKIP_MISSING_OR_NULL_CONFIG, Boolean.toString(skipMissingOrNull)); final ExtractTopic transform = createTransformationObject(); transform.configure(props); return transform; diff --git a/src/test/java/io/aiven/kafka/connect/transforms/HashConfigTest.java b/src/test/java/io/aiven/kafka/connect/transforms/HashConfigTest.java index acde075..109306f 100644 --- a/src/test/java/io/aiven/kafka/connect/transforms/HashConfigTest.java +++ b/src/test/java/io/aiven/kafka/connect/transforms/HashConfigTest.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.Map; +import io.aiven.kafka.connect.transforms.utils.CursorField; import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; @@ -77,7 +78,7 @@ void emptyFieldName() { props.put("field.name", ""); props.put("function", "sha256"); final HashConfig config = new HashConfig(props); - assertThat(config.fieldName()).isNotPresent(); + assertThat(config.field()).isNotPresent(); } @Test @@ -86,7 +87,7 @@ void definedFieldName() { props.put("field.name", "test"); props.put("function", "sha256"); final HashConfig config = new HashConfig(props); - assertThat(config.fieldName()).hasValue("test"); + assertThat(config.field().map(CursorField::getCursor)).hasValue("test"); assertThat(config.hashFunction()).isEqualTo(HashConfig.HashFunction.SHA256); } }