Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public KeySourceRecordReader(RedisSourceConfig sourceConfig, Duration idleTimeou
}

@Override
public void open() throws Exception {
public void open(Map<String, Object> offset) throws Exception {
RedisURI uri = config.uri();
this.client = config.client(uri);
this.pool = config.pool(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public enum ReaderType {
public static final String STREAM_CONSUMER_GROUP_DEFAULT = "kafka-consumer-group";
public static final String STREAM_CONSUMER_GROUP_DOC = "Stream consumer group";

public static final String STREAM_DELIVERY_TYPE_AT_MOST_ONCE = "at-most-once";
public static final String STREAM_DELIVERY_TYPE_AT_LEAST_ONCE = "at-least-once";

public static final String STREAM_DELIVERY_TYPE_CONFIG = "redis.stream.delivery.type";
public static final String STREAM_DELIVERY_TYPE_DEFAULT = STREAM_DELIVERY_TYPE_AT_LEAST_ONCE;
public static final String STREAM_DELIVERY_TYPE_DOC = "The delivery guarantee, either 'at-least-once' or 'at-most-once'";

public static final String STREAM_CONSUMER_NAME_CONFIG = "redis.stream.consumer.name";
public static final String STREAM_CONSUMER_NAME_DEFAULT = "consumer-" + TOKEN_TASK;
public static final String STREAM_CONSUMER_NAME_DOC = "A format string for the stream consumer, which may contain '"
Expand All @@ -85,6 +92,7 @@ public enum ReaderType {
private final Long batchSize;
private final Long streamBlock;
private final String topicName;
private final String streamDeliveryType;

public RedisSourceConfig(Map<?, ?> originals) {
super(new RedisSourceConfigDef(), originals);
Expand All @@ -97,6 +105,7 @@ public RedisSourceConfig(Map<?, ?> originals) {
this.streamConsumerGroup = getString(STREAM_CONSUMER_GROUP_CONFIG);
this.streamConsumerName = getString(STREAM_CONSUMER_NAME_CONFIG);
this.streamBlock = getLong(STREAM_BLOCK_CONFIG);
this.streamDeliveryType = getString(STREAM_DELIVERY_TYPE_CONFIG);
}

public ReaderType getReaderType() {
Expand Down Expand Up @@ -131,6 +140,10 @@ public String getStreamConsumerName() {
return streamConsumerName;
}

public String getStreamDeliveryType() {
return streamDeliveryType;
}

public String getTopicName() {
return topicName;
}
Expand Down Expand Up @@ -170,6 +183,12 @@ private void define() {
define(ConfigKeyBuilder.of(STREAM_BLOCK_CONFIG, ConfigDef.Type.LONG).defaultValue(STREAM_BLOCK_DEFAULT)
.importance(ConfigDef.Importance.LOW).documentation(STREAM_BLOCK_DOC)
.validator(ConfigDef.Range.atLeast(1L)).build());
define(ConfigKeyBuilder.of(STREAM_DELIVERY_TYPE_CONFIG, ConfigDef.Type.STRING)
.documentation(STREAM_DELIVERY_TYPE_DOC).defaultValue(STREAM_DELIVERY_TYPE_DEFAULT)
.importance(ConfigDef.Importance.MEDIUM)
.validator(ConfigDef.ValidString.in(STREAM_DELIVERY_TYPE_AT_LEAST_ONCE,
STREAM_DELIVERY_TYPE_AT_MOST_ONCE))
.build());
}

}
Expand All @@ -179,25 +198,29 @@ public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + Objects.hash(batchSize, keyPatterns, readerType, streamBlock, streamConsumerGroup,
streamConsumerName, streamName, streamOffset, topicName);
streamConsumerName, streamName, streamOffset, topicName, streamDeliveryType);
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
if (this == obj) {
return true;
if (!super.equals(obj))
}
if (!super.equals(obj)) {
return false;
if (getClass() != obj.getClass())
}
if (getClass() != obj.getClass()) {
return false;
RedisSourceConfig other = (RedisSourceConfig) obj;
}
final RedisSourceConfig other = (RedisSourceConfig) obj;
return Objects.equals(batchSize, other.batchSize) && Objects.equals(keyPatterns, other.keyPatterns)
&& readerType == other.readerType && Objects.equals(streamBlock, other.streamBlock)
&& Objects.equals(streamConsumerGroup, other.streamConsumerGroup)
&& Objects.equals(streamConsumerName, other.streamConsumerName)
&& Objects.equals(streamName, other.streamName) && Objects.equals(streamOffset, other.streamOffset)
&& Objects.equals(topicName, other.topicName);
&& Objects.equals(topicName, other.topicName)
&& Objects.equals(streamDeliveryType, other.streamDeliveryType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package com.redis.kafka.connect.source;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
Expand All @@ -43,9 +45,13 @@ public SourceRecordReader getReader() {

@Override
public void start(Map<String, String> props) {
Map<String, Object> offset = null;
if (context != null) {
offset = context.offsetStorageReader().offset(Collections.emptyMap());
}
this.reader = reader(props);
try {
this.reader.open();
this.reader.open(offset);
} catch (Exception e) {
throw new RetriableException("Could not open reader", e);
}
Expand All @@ -70,6 +76,16 @@ public void stop() {
}
}

@Override
public void commit() throws InterruptedException {
reader.commit();
}

@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {
reader.commitRecord(record, metadata);
}

@Override
public List<SourceRecord> poll() {
return reader.poll();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package com.redis.kafka.connect.source;

import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.source.SourceRecord;

public interface SourceRecordReader {

void open() throws Exception;
void open(Map<String, Object> offset) throws Exception;

List<SourceRecord> poll();

default void commit() throws InterruptedException {
}

List<SourceRecord> poll();
default void commitRecord(SourceRecord r, RecordMetadata m) throws InterruptedException {
}

void close();
void close();
}
Loading