Skip to content

Commit 8651362

Browse files
stheppistheppi
and
stheppi
authored
GCP PubSub Source (#209)
The code was not wiring in the headers mapping. The change just ensures the SourceRecord created contains the headers as returned by the headers mapper Co-authored-by: stheppi <[email protected]>
1 parent 59856da commit 8651362

File tree

1 file changed

+12
-1
lines changed
  • java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping

1 file changed

+12
-1
lines changed

java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverter.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
import org.apache.kafka.common.config.ConfigException;
1919
import org.apache.kafka.connect.data.Schema;
20+
import org.apache.kafka.connect.header.ConnectHeaders;
21+
import org.apache.kafka.connect.header.Header;
22+
import org.apache.kafka.connect.header.Headers;
2023
import org.apache.kafka.connect.source.SourceRecord;
2124

2225
import io.lenses.streamreactor.common.config.base.intf.Converter;
@@ -34,14 +37,22 @@ public class SourceRecordConverter extends Converter<PubSubMessageData, SourceRe
3437

3538
@Override
3639
protected SourceRecord convert(final PubSubMessageData source) throws ConfigException {
40+
41+
final Headers headers = new ConnectHeaders();
42+
mappingConfig.getHeaderMapper().mapHeaders(source).forEach((k, v) -> {
43+
headers.add(k, v, Schema.STRING_SCHEMA);
44+
});
3745
return new SourceRecord(
3846
source.getSourcePartition().toMap(),
3947
source.getSourceOffset().toMap(),
4048
source.getTargetTopicName(),
49+
null,
4150
getKeySchema(),
4251
getKey(source),
4352
getValueSchema(),
44-
getValue(source)
53+
getValue(source),
54+
System.currentTimeMillis(),
55+
headers
4556
);
4657
}
4758

0 commit comments

Comments
 (0)