Skip to content

Commit a46ffa4

Browse files
committed
to #103, Support PB format serializer in Kafka Source
1 parent 7bd7aef commit a46ffa4

File tree

12 files changed

+374
-9
lines changed

12 files changed

+374
-9
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>bitsail-component-formats</artifactId>
7+
<groupId>com.bytedance.bitsail</groupId>
8+
<version>${revision}</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>bitsail-component-format-pb</artifactId>
13+
14+
<properties>
15+
<maven.compiler.source>8</maven.compiler.source>
16+
<maven.compiler.target>8</maven.compiler.target>
17+
</properties>
18+
19+
</project>

bitsail-components/bitsail-component-formats/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
<packaging>pom</packaging>
3333
<modules>
3434
<module>bitsail-component-format-json</module>
35+
<module>bitsail-component-format-pb</module>
3536
</modules>
3637

3738
<dependencies>

bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@
9898
<version>${revision}</version>
9999
<scope>test</scope>
100100
</dependency>
101+
<dependency>
102+
<groupId>com.bytedance.bitsail</groupId>
103+
<artifactId>bitsail-flink-row-parser</artifactId>
104+
</dependency>
101105
</dependencies>
102106

103107
</project>

bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/main/java/com/bytedance/bitsail/connector/legacy/kafka/deserialization/DeserializationSchemaFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
public class DeserializationSchemaFactory {
4141
private static final String STREAMING_FILE_DESERIALIZATION_SCHEMA_KEY = "streaming_file";
4242
private static final String JSON_DESERIALIZATION_SCHEMA_KEY = "json";
43+
private static final String PB_DESERIALIZATION_SCHEMA_KEY = "protobuf";
4344

4445
public static KafkaDeserializationSchema<Row> getDeserializationSchema(BitSailConfiguration configuration) {
4546
String formatType = configuration.get(BaseMessageQueueReaderOptions.FORMAT_TYPE);
@@ -64,6 +65,14 @@ public static KafkaDeserializationSchema<Row> getDeserializationSchema(BitSailCo
6465
.build());
6566
}
6667

68+
if (StringUtils.equalsIgnoreCase(PB_DESERIALIZATION_SCHEMA_KEY, formatType)) {
69+
try {
70+
return new CountKafkaDeserializationSchemaWrapper<>(configuration, new PbDeserializationSchema(configuration));
71+
} catch (Exception e) {
72+
throw new IllegalArgumentException("Pb parser encountered error during initialization.", e);
73+
}
74+
}
75+
6776
throw new IllegalArgumentException(String.format("Unsupported %s format type.", formatType));
6877
}
6978
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.bytedance.bitsail.connector.legacy.kafka.deserialization;
19+
20+
import com.bytedance.bitsail.batch.file.parser.PbBytesParser;
21+
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
22+
import com.bytedance.bitsail.flink.core.typeinfo.PrimitiveColumnTypeInfo;
23+
24+
import com.google.protobuf.Descriptors;
25+
import org.apache.flink.annotation.Internal;
26+
import org.apache.flink.api.common.serialization.DeserializationSchema;
27+
import org.apache.flink.api.common.typeinfo.TypeInformation;
28+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
29+
import org.apache.flink.types.Row;
30+
import org.apache.flink.util.Collector;
31+
32+
import java.io.IOException;
33+
import java.util.List;
34+
35+
@Internal
36+
public class PbDeserializationSchema implements DeserializationSchema<Row> {
37+
private static final long serialVersionUID = -2556547991095476394L;
38+
private final PbBytesParser parser;
39+
private final RowTypeInfo rowTypeInfo;
40+
private final int arity;
41+
42+
public PbDeserializationSchema(BitSailConfiguration jobConf) throws Exception {
43+
this.parser = new PbBytesParser(jobConf);
44+
45+
List<Descriptors.FieldDescriptor> fields = parser.getDescriptor().getFields();
46+
this.arity = fields.size();
47+
PrimitiveColumnTypeInfo<?>[] types = new PrimitiveColumnTypeInfo[arity];
48+
String[] fieldNames = new String[arity];
49+
for (int i = 0; i < arity; i++) {
50+
Descriptors.FieldDescriptor field = fields.get(i);
51+
types[i] = getColumnTypeInfo(field);
52+
fieldNames[i] = field.getJsonName();
53+
}
54+
this.rowTypeInfo = new RowTypeInfo(types, fieldNames);
55+
}
56+
57+
private PrimitiveColumnTypeInfo<?> getColumnTypeInfo(Descriptors.FieldDescriptor field) {
58+
switch (field.getJavaType()) {
59+
case INT:
60+
case LONG:
61+
return PrimitiveColumnTypeInfo.LONG_COLUMN_TYPE_INFO;
62+
case FLOAT:
63+
case DOUBLE:
64+
return PrimitiveColumnTypeInfo.DOUBLE_COLUMN_TYPE_INFO;
65+
case BOOLEAN:
66+
return PrimitiveColumnTypeInfo.BOOL_COLUMN_TYPE_INFO;
67+
case BYTE_STRING:
68+
return PrimitiveColumnTypeInfo.BYTES_COLUMN_TYPE_INFO;
69+
case MESSAGE:
70+
case STRING:
71+
case ENUM:
72+
default:
73+
return PrimitiveColumnTypeInfo.STRING_COLUMN_TYPE_INFO;
74+
}
75+
}
76+
77+
@Override
78+
public void open(DeserializationSchema.InitializationContext context) throws Exception {
79+
}
80+
81+
@Override
82+
public Row deserialize(byte[] value) throws IOException {
83+
return this.parser.parse(new Row(arity), value, 0, value.length, null, rowTypeInfo);
84+
}
85+
86+
@Override
87+
public void deserialize(byte[] value, Collector<Row> out) throws IOException {
88+
out.collect(deserialize(value));
89+
}
90+
91+
@Override
92+
public boolean isEndOfStream(Row row) {
93+
return false;
94+
}
95+
96+
@Override
97+
public TypeInformation<Row> getProducedType() {
98+
return rowTypeInfo;
99+
}
100+
}

bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/java/com/bytedance/bitsail/connector/legacy/kafka/source/KafkaSourceITCase.java

Lines changed: 110 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,36 @@
1717

1818
package com.bytedance.bitsail.connector.legacy.kafka.source;
1919

20+
import com.bytedance.bitsail.batch.file.parser.PbBytesParser;
2021
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
22+
import com.bytedance.bitsail.parser.option.RowParserOptions;
2123
import com.bytedance.bitsail.test.connector.test.EmbeddedFlinkCluster;
2224
import com.bytedance.bitsail.test.connector.test.testcontainers.kafka.KafkaCluster;
2325
import com.bytedance.bitsail.test.connector.test.utils.JobConfUtils;
2426

2527
import com.alibaba.fastjson.JSONObject;
2628
import com.google.common.collect.Maps;
29+
import com.google.protobuf.ByteString;
30+
import com.google.protobuf.Descriptors;
31+
import com.google.protobuf.DynamicMessage;
32+
import org.apache.commons.io.IOUtils;
2733
import org.apache.kafka.clients.consumer.ConsumerConfig;
2834
import org.apache.kafka.clients.producer.KafkaProducer;
35+
import org.apache.kafka.clients.producer.ProducerConfig;
2936
import org.apache.kafka.clients.producer.ProducerRecord;
37+
import org.apache.kafka.common.serialization.ByteArraySerializer;
38+
import org.apache.kafka.common.serialization.StringSerializer;
3039
import org.junit.After;
3140
import org.junit.Before;
3241
import org.junit.Test;
3342
import org.slf4j.Logger;
3443
import org.slf4j.LoggerFactory;
44+
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
3545

46+
import java.io.File;
47+
import java.net.URI;
48+
import java.util.Base64;
49+
import java.util.List;
3650
import java.util.Map;
3751
import java.util.concurrent.ScheduledThreadPoolExecutor;
3852
import java.util.concurrent.TimeUnit;
@@ -59,34 +73,121 @@ private static String constructARecord(int index) {
5973
public void before() {
6074
kafkaCluster.startService();
6175
kafkaCluster.createTopic(topicName);
62-
startSendDataToKafka();
6376
}
6477

65-
private void startSendDataToKafka() {
78+
private void startSendJsonDataToKafka() {
6679
KafkaProducer<String, String> producer = kafkaCluster.getProducer(topicName);
6780
ScheduledThreadPoolExecutor produceService = new ScheduledThreadPoolExecutor(1);
6881
AtomicInteger sendCount = new AtomicInteger(0);
6982
produceService.scheduleAtFixedRate(() -> {
7083
try {
71-
for (int i = 0; i < 5000; ++i) {
72-
String record = constructARecord(sendCount.getAndIncrement());
73-
producer.send(new ProducerRecord(topicName, record));
74-
}
84+
for (int i = 0; i < 5000; ++i) {
85+
String record = constructARecord(sendCount.getAndIncrement());
86+
producer.send(new ProducerRecord(topicName, record));
87+
}
7588
} catch (Exception e) {
76-
LOG.error("failed to send a record");
89+
LOG.error("failed to send a record");
7790
} finally {
78-
LOG.info(">>> kafka produce count: {}", sendCount.get());
91+
LOG.info(">>> kafka produce count: {}", sendCount.get());
92+
}
93+
}, 0, 1, TimeUnit.SECONDS);
94+
}
95+
96+
private void startSendPbDataToKafka(BitSailConfiguration configuration) throws Exception {
97+
PbBytesParser parser = new PbBytesParser(configuration);
98+
List<Descriptors.FieldDescriptor> fields = parser.getDescriptor().getFields();
99+
Descriptors.Descriptor type = parser.getDescriptor();
100+
101+
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(
102+
ImmutableMap.of(
103+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCluster.getBootstrapServer(),
104+
ProducerConfig.CLIENT_ID_CONFIG, "producer"
105+
),
106+
new StringSerializer(),
107+
new ByteArraySerializer()
108+
);
109+
ScheduledThreadPoolExecutor produceService = new ScheduledThreadPoolExecutor(1);
110+
AtomicInteger sendCount = new AtomicInteger(0);
111+
produceService.scheduleAtFixedRate(() -> {
112+
try {
113+
for (int i = 0; i < 5000; ++i) {
114+
DynamicMessage.Builder builder = DynamicMessage.newBuilder(type);
115+
for (Descriptors.FieldDescriptor field : fields) {
116+
switch (field.getJavaType()) {
117+
case INT:
118+
builder.setField(field, i);
119+
break;
120+
case LONG:
121+
builder.setField(field, (long) i);
122+
break;
123+
case FLOAT:
124+
builder.setField(field, (float) i);
125+
break;
126+
case DOUBLE:
127+
builder.setField(field, (double) i);
128+
break;
129+
case BOOLEAN:
130+
builder.setField(field, i % 2 == 0);
131+
break;
132+
case BYTE_STRING:
133+
builder.setField(field, ByteString.copyFrom(("bytes_" + i).getBytes()));
134+
break;
135+
case MESSAGE:
136+
case STRING:
137+
case ENUM:
138+
default:
139+
builder.setField(field, "text_" + i);
140+
break;
141+
}
142+
}
143+
DynamicMessage message = builder.build();
144+
producer.send(new ProducerRecord(topicName, message.toByteArray()));
145+
sendCount.getAndIncrement();
146+
}
147+
} catch (Exception e) {
148+
LOG.error("failed to send a record");
149+
} finally {
150+
LOG.info(">>> kafka produce count: {}", sendCount.get());
79151
}
80152
}, 0, 1, TimeUnit.SECONDS);
81153
}
82154

83155
@Test
84-
public void testKafkaSource() throws Exception {
156+
public void testKafkaSourceJsonFormat() throws Exception {
157+
startSendJsonDataToKafka();
85158
BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print.json");
86159
updateConfiguration(configuration);
87160
EmbeddedFlinkCluster.submitJob(configuration);
88161
}
89162

163+
@Test
164+
public void testKafkaSourcePbFormat() throws Exception {
165+
BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print_pb_format.json");
166+
URI proto = KafkaSourceITCase.class.getClassLoader().getResource("kafka.fds").toURI();
167+
byte[] descriptor = IOUtils.toByteArray(new File(proto).toURI());
168+
169+
String protoDescriptor = Base64.getEncoder().encodeToString(descriptor);
170+
configuration.set(RowParserOptions.PROTO_DESCRIPTOR, protoDescriptor);
171+
configuration.set(RowParserOptions.PROTO_CLASS_NAME, "ProtoTest");
172+
startSendPbDataToKafka(configuration);
173+
updateConfiguration(configuration);
174+
EmbeddedFlinkCluster.submitJob(configuration);
175+
}
176+
177+
@Test
178+
public void testKafkaSourcePbFormatFullTypes() throws Exception {
179+
BitSailConfiguration configuration = JobConfUtils.fromClasspath("kafka_to_print_pb_format_full_types.json");
180+
URI proto = KafkaSourceITCase.class.getClassLoader().getResource("kafka_full_types.fds").toURI();
181+
byte[] descriptor = IOUtils.toByteArray(new File(proto).toURI());
182+
183+
String protoDescriptor = Base64.getEncoder().encodeToString(descriptor);
184+
configuration.set(RowParserOptions.PROTO_DESCRIPTOR, protoDescriptor);
185+
configuration.set(RowParserOptions.PROTO_CLASS_NAME, "ProtoTest");
186+
startSendPbDataToKafka(configuration);
187+
updateConfiguration(configuration);
188+
EmbeddedFlinkCluster.submitJob(configuration);
189+
}
190+
90191
protected void updateConfiguration(BitSailConfiguration jobConfiguration) {
91192
// jobConfiguration.set(FakeReaderOptions.TOTAL_COUNT, TOTAL_SEND_COUNT);
92193

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2+
Z
3+
kafka.proto"C
4+
ProtoTest
5+
id (Rid
6+
name ( Rname
7+
date ( Rdatebproto3
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
syntax = "proto3";
2+
3+
message ProtoTest {
4+
int64 ID = 1;
5+
string NAME = 2;
6+
string DATE = 3;
7+
}

bitsail-connectors/bitsail-connectors-legacy/bitsail-connector-kafka/src/test/resources/kafka_full_types.fds

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
2+
�
3+
kafka_full_types.proto"�
4+
ProtoTest
5+
field1 (Rfield1
6+
field2 (Rfield2
7+
field3 (Rfield3
8+
field4 (Rfield4
9+
field5 (Rfield5
10+
field6 (Rfield6
11+
field7 (Rfield7
12+
field8 (Rfield8
13+
field9 (Rfield9
14+
field10
15+
(Rfield10
16+
field11 (Rfield11
17+
field12 (Rfield12
18+
field13 (Rfield13
19+
field14 ( Rfield14
20+
field15 ( Rfield15bproto3
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
syntax = "proto3";
2+
3+
message ProtoTest {
4+
double field1 = 1;
5+
float field2 = 2;
6+
int32 field3 = 3;
7+
int64 field4 = 4;
8+
uint32 field5 = 5;
9+
uint64 field6 = 6;
10+
sint32 field7 = 7;
11+
sint64 field8 = 8;
12+
fixed32 field9 = 9;
13+
fixed64 field10 = 10;
14+
sfixed32 field11 = 11;
15+
sfixed64 field12 = 12;
16+
bool field13 = 13;
17+
string field14 = 14;
18+
bytes field15 = 15;
19+
}

0 commit comments

Comments
 (0)