Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/conduit-kafka-connect-wrapper
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env bash
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
exec /usr/bin/env java -cp "$SCRIPT_DIR/conduit-kafka-connect-wrapper-0.3.0.jar:$SCRIPT_DIR/libs/*" io.conduit.Application
exec /usr/bin/env java --add-opens=java.base/java.nio=ALL-UNNAMED -cp "$SCRIPT_DIR/conduit-kafka-connect-wrapper-0.3.0.jar:$SCRIPT_DIR/libs/*" io.conduit.Application
7 changes: 7 additions & 0 deletions scripts/conduit-kafka-connect-wrapper-debug
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
exec /usr/bin/env java \
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,quiet=y,address=*:5005 \
-cp "$SCRIPT_DIR/conduit-kafka-connect-wrapper-0.3.0.jar:$SCRIPT_DIR/libs/*" \
--add-opens=java.base/java.nio=ALL-UNNAMED \
io.conduit.Application
13 changes: 13 additions & 0 deletions src/main/java/io/conduit/CombinedSchemaProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,17 @@ public Schema provide(Record rec) {
}
return null;
}

@Override
public String name() {
if (rawDataSP != null) {
return rawDataSP.name();
}

if (structSP != null) {
return structSP.name();
}

return null;
}
}
47 changes: 36 additions & 11 deletions src/main/java/io/conduit/DefaultDestinationStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import com.google.protobuf.ByteString;
import io.conduit.grpc.Destination;
Expand All @@ -27,6 +28,7 @@
import lombok.SneakyThrows;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
Expand All @@ -45,13 +47,20 @@ public class DefaultDestinationStream implements StreamObserver<Destination.Run.
private final StreamObserver<Destination.Run.Response> responseObserver;
private final ToConnectData toConnectData;

private final SimpleDestinationTaskCtx taskCtx;

private final AtomicLong offsetCounter;

public DefaultDestinationStream(SinkTask task,
SchemaProvider schemaProvider,
SimpleDestinationTaskCtx taskCtx,
StreamObserver<Destination.Run.Response> responseObserver) {
this.task = task;
this.schemaProvider = schemaProvider;
this.responseObserver = responseObserver;
this.toConnectData = new ToConnectData();
this.taskCtx = taskCtx;
this.offsetCounter = new AtomicLong(1);
}

@Override
Expand All @@ -60,7 +69,7 @@ public void onNext(Destination.Run.Request request) {
// Currently, Conduit requires all writes to be asynchronous.
// See: pkg/connector/destination.go, method Write().
Record rec = request.getRecord();
doWrite(rec);
doWriteWithOffsetRetry(rec);
responseObserver.onNext(responseWith(rec.getPosition()));
} catch (Exception e) {
logger.error("Couldn't write record.", e);
Expand All @@ -80,8 +89,20 @@ private Destination.Run.Response responseWith(ByteString position) {
.build();
}

private void doWrite(Record rec) {
SinkRecord sinkRecord = toSinkRecord(rec);
private void doWriteWithOffsetRetry(Record rec) {
synchronized (taskCtx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is because you think we might write multiple records at the same time. If that's the case, I don't think Conduit will ever send a batch before the previous one is "done".

SinkRecord sinkRecord = toSinkRecord(rec);
doWrite(sinkRecord);

TopicPartition tp = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition());
if (taskCtx.isReset(tp, sinkRecord.kafkaOffset())) {
taskCtx.ackResetOffset(tp, sinkRecord.kafkaOffset());
doWrite(sinkRecord);
}
}
}

private void doWrite(SinkRecord sinkRecord) {
task.put(List.of(sinkRecord));
task.preCommit(Map.of(
new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition()),
Expand All @@ -94,21 +115,25 @@ private SinkRecord toSinkRecord(Record rec) {
// todo cache the JSON object
// Also related to: https://github.com/ConduitIO/conduit-kafka-connect-wrapper/issues/58
var schema = schemaProvider.provide(rec);

Object value = toConnectData.apply(rec, schema);

var schemaUsed = getSchema(value, schema);

// Set first offset encountered in the runtime to the current time in millis.
offsetCounter.compareAndSet(1, System.currentTimeMillis());

// While there's no real topic involved, we still assign values
// to topic, partition and offset since the underlying connector might use them.
// The offset is set to System.currentTimeMillis() to mimic the increasing
// offset values if a Kafka topic partition.
return new SinkRecord(
schemaUsed != null ? schemaUsed.name() : null,
0,
Schema.STRING_SCHEMA,
rec.getKey().getRawData().toStringUtf8(),
schemaUsed,
value,
System.currentTimeMillis()
schemaUsed != null ? schemaUsed.name() : schemaProvider.name(),
0,
Schema.STRING_SCHEMA,
rec.getKey().getRawData().toStringUtf8(),
schemaUsed,
value,
offsetCounter.incrementAndGet()
);
}

Expand Down
6 changes: 5 additions & 1 deletion src/main/java/io/conduit/DestinationService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class DestinationService extends DestinationPluginGrpc.DestinationPluginI
private boolean started;
private SchemaProvider schemaProvider;

private SimpleDestinationTaskCtx taskCtx;

public DestinationService(TaskFactory taskFactory) {
this.taskFactory = taskFactory;
}
Expand Down Expand Up @@ -72,6 +74,7 @@ private void doConfigure(DestinationConfig config) {
this.task = taskFactory.newSinkTask(config.getConnectorClass());
this.schemaProvider = buildSchemaProvider(config);
this.config = config.getKafkaConnectorCfg();
this.taskCtx = new SimpleDestinationTaskCtx(this.config);
}

private SchemaProvider buildSchemaProvider(DestinationConfig config) {
Expand Down Expand Up @@ -105,6 +108,7 @@ public void start(Destination.Start.Request request, StreamObserver<Destination.
logger.info("Starting the destination.");

try {
task.initialize(taskCtx);
task.start(config);
started = true;
logger.info("Destination started.");
Expand All @@ -121,7 +125,7 @@ public void start(Destination.Start.Request request, StreamObserver<Destination.

@Override
public StreamObserver<Destination.Run.Request> run(StreamObserver<Destination.Run.Response> responseObserver) {
this.runStream = new DefaultDestinationStream(task, schemaProvider, responseObserver);
this.runStream = new DefaultDestinationStream(task, schemaProvider, taskCtx, responseObserver);
return runStream;
}

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/conduit/FixedSchemaProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,12 @@ public FixedSchemaProvider(Schema schema) {
public Schema provide(Record rec) {
return schema;
}

@Override
public String name() {
if (schema == null) {
return null;
}
return schema.name();
}
}
5 changes: 5 additions & 0 deletions src/main/java/io/conduit/RawDataSchemaProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public Schema provide(Record rec) {
return schemaForJson(json);
}

@Override
public String name() {
return name;
}

private Schema schemaForJson(JsonNode json) {
switch (json.getNodeType()) {
case OBJECT:
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/conduit/SchemaProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@
*/
public interface SchemaProvider {
Schema provide(Record rec);

String name();
}
87 changes: 87 additions & 0 deletions src/main/java/io/conduit/SimpleDestinationTaskCtx.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2022 Meroxa, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.conduit;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTaskContext;


/**
* A {@link SinkTaskContext} implementation which also provides
* the source task's configuration and position (a mapping from partitions to offsets).
*/
public class SimpleDestinationTaskCtx implements SinkTaskContext {
private final Map<String, String> config;
private Map<TopicPartition, Long> resetTopicPartitioOffset;

public SimpleDestinationTaskCtx(Map<String, String> config) {
this.config = config;
this.resetTopicPartitioOffset = new HashMap<TopicPartition, Long>();
}

@Override
public Map<String, String> configs() {
return config;
}

@Override
public void offset(Map<TopicPartition, Long> offsets) {
}

@Override
public void offset(TopicPartition tp, long offset) {
resetTopicPartitioOffset.put(tp, offset);
}

@Override
public void timeout(long timeoutMs) {
throw new UnsupportedOperationException();
}

@Override
public Set<TopicPartition> assignment() {
return null;
}

@Override
public void pause(TopicPartition... partitions) {
throw new UnsupportedOperationException();
}

@Override
public void resume(TopicPartition... partitions) {
throw new UnsupportedOperationException();
}

@Override
public void requestCommit() {
throw new UnsupportedOperationException();
}

public boolean isReset(TopicPartition tp, long offset) {
return resetTopicPartitioOffset.containsKey(tp)
&& resetTopicPartitioOffset.get(tp) == offset;
}

public void ackResetOffset(TopicPartition tp, long offset) {
resetTopicPartitioOffset.remove(tp, offset);
}
}
5 changes: 5 additions & 0 deletions src/main/java/io/conduit/StructSchemaProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public Schema provide(Record rec) {
return schemaForStruct(rec.getPayload().getAfter().getStructuredData());
}

@Override
public String name() {
return name;
}

private Schema schemaForStruct(Struct struct) {
SchemaBuilder builder = new SchemaBuilder(Schema.Type.STRUCT)
.name(name)
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/io/conduit/DefaultDestinationStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void testRecordWrittenSuccesfully() {
var underTest = new DefaultDestinationStream(
task,
schemaProvider,
new SimpleDestinationTaskCtx(null),
responseObserver
);

Expand Down Expand Up @@ -111,6 +112,7 @@ public void testOnError() {
var underTest = new DefaultDestinationStream(
task,
schemaProvider,
new SimpleDestinationTaskCtx(null),
responseObserver
);

Expand All @@ -127,6 +129,7 @@ public void testOnCompleted() {
var underTest = new DefaultDestinationStream(
task,
schemaProvider,
new SimpleDestinationTaskCtx(null),
responseObserver
);

Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/conduit/DestinationStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ public void setUp() {
.field("id", Schema.INT32_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.build();
this.underTest = new DefaultDestinationStream(task, new FixedSchemaProvider(schema), streamObserver);
this.underTest = new DefaultDestinationStream(task, new FixedSchemaProvider(schema), new SimpleDestinationTaskCtx(null), streamObserver);
}

@Test
public void testWriteRecordNoSchema() {
DefaultDestinationStream underTest = new DefaultDestinationStream(task, new FixedSchemaProvider(null), streamObserver);
DefaultDestinationStream underTest = new DefaultDestinationStream(task, new FixedSchemaProvider(null), new SimpleDestinationTaskCtx(null), streamObserver);
Destination.Run.Request request = newRequest();
Record record = request.getRecord();

Expand Down Expand Up @@ -156,4 +156,4 @@ private Record newRecord() {
.putMetadata(Opencdc.metadataCreatedAt.getDefaultValue(), "123456000000000")
.build();
}
}
}