Skip to content

Compression support #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>kafka-connect-parent</artifactId>
<version>2.0.0-cp1</version>
<version>2.1.1-cp1</version>
</parent>
<artifactId>kafka-connect-transform-common</artifactId>
<version>0.1.0-SNAPSHOT</version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Copyright © 2017 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation;
import com.google.common.io.ByteStreams;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Base64;
import java.util.Map;

public abstract class Compress<R extends ConnectRecord<R>> extends BaseKeyValueTransformation<R> {
public Compress(boolean isKey) {
super(isKey);
}


@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {

}

protected abstract OutputStream createStream(OutputStream input) throws IOException;


@Override
public void configure(Map<String, ?> map) {

}

@Override
protected SchemaAndValue processString(R record, Schema inputSchema, String base64Input) {
byte[] input = Base64.getDecoder().decode(base64Input);
Schema bytesSchema = inputSchema.isOptional() ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA;
SchemaAndValue compressed = processBytes(record, bytesSchema, input);
String result = Base64.getEncoder().encodeToString((byte[]) compressed.value());
return new SchemaAndValue(inputSchema, result);
}

@Override
protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) {
try (InputStream inputStream = new ByteArrayInputStream(input)) {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
try (OutputStream compressStream = createStream(outputStream)) {
ByteStreams.copy(inputStream, compressStream);
compressStream.flush();
return new SchemaAndValue(inputSchema, outputStream.toByteArray());
}
}
} catch (IOException ex) {
throw new DataException(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright © 2017 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation;
import com.google.common.io.ByteStreams;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Base64;
import java.util.Map;

public abstract class Decompress<R extends ConnectRecord<R>> extends BaseKeyValueTransformation<R> {
public Decompress(boolean isKey) {
super(isKey);
}


@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {

}

protected abstract InputStream createStream(InputStream input) throws IOException;


@Override
public void configure(Map<String, ?> map) {

}

@Override
protected SchemaAndValue processString(R record, Schema inputSchema, String base64Input) {
byte[] input = Base64.getDecoder().decode(base64Input);
Schema bytesSchema = inputSchema.isOptional() ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA;
SchemaAndValue compressed = processBytes(record, bytesSchema, input);
String result = Base64.getEncoder().encodeToString((byte[]) compressed.value());
return new SchemaAndValue(inputSchema, result);
}

@Override
protected SchemaAndValue processBytes(R record, Schema inputSchema, byte[] input) {
try (InputStream inputStream = new ByteArrayInputStream(input)) {
try (InputStream decompressStream = createStream(inputStream)) {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
ByteStreams.copy(decompressStream, outputStream);
return new SchemaAndValue(inputSchema, outputStream.toByteArray());
}
}
} catch (IOException ex) {
throw new DataException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Date;
import java.util.Map;

@Title("ExtractTimestamp")
@Description("This transformation is used to use a field from the input data to override the timestamp for the record.")
public abstract class ExtractTimestamp<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger log = LoggerFactory.getLogger(ExtractTimestamp.class);
public ExtractTimestampConfig config;
Expand Down Expand Up @@ -127,8 +129,23 @@ public void configure(Map<String, ?> settings) {
}


@Title("ExtractTimestamp(Value)")
@Description("This transformation is used to use a field from the input data to override the timestamp for the record.")
public static class Key<R extends ConnectRecord<R>> extends ExtractTimestamp<R> {

@Override
public R apply(R r) {
final long timestamp = process(new SchemaAndValue(r.valueSchema(), r.value()));
return r.newRecord(
r.topic(),
r.kafkaPartition(),
r.keySchema(),
r.key(),
r.valueSchema(),
r.value(),
timestamp
);
}
}

public static class Value<R extends ConnectRecord<R>> extends ExtractTimestamp<R> {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright © 2017 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import org.apache.kafka.connect.connector.ConnectRecord;

import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;

public abstract class GzipCompress<R extends ConnectRecord<R>> extends Compress<R> {
public GzipCompress(boolean isKey) {
super(isKey);
}

@Override
protected OutputStream createStream(OutputStream input) throws IOException {
return new GZIPOutputStream(input);
}

public static class Key<R extends ConnectRecord<R>> extends GzipCompress<R> {
public Key() {
super(true);
}
}

public static class Value<R extends ConnectRecord<R>> extends GzipCompress<R> {
public Value() {
super(false);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright © 2017 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.transform.common;

import org.apache.kafka.connect.connector.ConnectRecord;

import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;

public abstract class GzipDecompress<R extends ConnectRecord<R>> extends Decompress<R> {
public GzipDecompress(boolean isKey) {
super(isKey);
}

@Override
protected InputStream createStream(InputStream input) throws IOException {
return new GZIPInputStream(input);
}

public static class Key<R extends ConnectRecord<R>> extends GzipDecompress<R> {
public Key() {
super(true);
}
}

public static class Value<R extends ConnectRecord<R>> extends GzipDecompress<R> {
public Value() {
super(false);
}
}
}
Loading