Skip to content

Commit 980cbd3

Browse files
authored
add support for full record (key+value) access (#88)
* add support for full record (key+value) access - deprecate existing serde methods from interfaces - add new serde methods tied type-wise to our default impl - provide default JSON serde implementations - add tests and utility classes for working with keyed decodable streams - rewrite existing / add new examples showing how to handle keyless vs keyed records * add relocation to shadowJar task of example project this is necessary since decodable flink runtime ships with relocated kafka-clients classes which otherwise couldn't be resolved on the classpath the other option would be to relocate these classes in the SDK's build already but this would still require some modification to this build otherwise tests would locally fail * apply same changes to flink 1.18 * apply same changes to flink 1.16 * apply same changes to flink 1.20 * adapt sql to work around limitations and make calcite parser in Flink 1.16 happy * add necessary relocations for maven build of the example project * remove some unused imports * introduce serialization key/value fields constraint checking - allow for value only constructor in DecodableAbstractStreamRecord - improve error message and exception handling in Serde code - adapt usage in job test * adapt example jobs to reflect recent sdk changes * apply recent sdk changes for Flink 1.18 * apply recent sdk changes for Flink 1.20 * apply recent sdk changes for Flink 1.16 * minor constructor refactoring * add test to example project showing how to validate POJO types for serialization * follow method name convention for parameterized test * refactor and add building blocks for handling append vs. change streams * refactor and add tests * adapt base types for example project * consistent naming of things in tests * refactor tests and use consistent namings * add example job including a test for change stream processing * add serializability related tests for used POJO types * clean up example jobs and use more consistent naming * make sure to serialize record key as JSON only when key Class is set and different from Void * add example jobs with tests to show how to work with mixed append streams - keyed2keyless append stream - keyless2keyed append stream * add explicit key checks for keyed append and change stream tests of example jobs * use fixed xmx settings for both builds (maven+gradle) * add explicit key checks for keyed append and change stream tests in the sdk * apply latest code / test changes to flink version 1.20 * apply latest code / test changes to flink version 1.18 * apply latest code / test changes to flink version 1.16
1 parent afe6703 commit 980cbd3

File tree

166 files changed

+8900
-185
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

166 files changed

+8900
-185
lines changed

examples/custom-pipelines-hello-world/build.gradle

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ plugins {
1313
}
1414

1515
group = 'co.decodable.examples'
16-
version = '0.2'
16+
version = '0.3'
1717

1818
ext {
1919
flinkVersion = '1.19.2'
2020
kafkaConnectorVersion = '3.3.0-1.19'
2121
log4jVersion = '2.17.1'
22-
sdkVersion = '1.19.2-1.0.0.Beta9'
22+
sdkVersion = '1.19.2-1.0.0-SNAPSHOT'
2323
}
2424

2525
repositories {
@@ -116,6 +116,8 @@ shadowJar {
116116
}
117117

118118
mergeServiceFiles()
119+
120+
relocate('org.apache.kafka.clients','org.apache.flink.kafka.shaded.org.apache.kafka.clients')
119121
}
120122

121123
java {
@@ -125,3 +127,7 @@ java {
125127
withJavadocJar()
126128
withSourcesJar()
127129
}
130+
131+
test {
132+
maxHeapSize = "1024m"
133+
}

examples/custom-pipelines-hello-world/decodable-resources.yaml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,17 @@ spec:
123123
type: JAVA
124124

125125
# When building with Maven
126-
# job_file_path: target/custom-pipelines-hello-world-0.2.jar
126+
# job_file_path: target/custom-pipelines-hello-world-0.3.jar
127127
# When building with Gradle
128-
job_file_path: build/libs/custom-pipelines-hello-world-0.2-all.jar
129-
130-
# When running the DataStream API Job
131-
entry_class: co.decodable.examples.cpdemo.DataStreamJob
132-
# When running the Table API Job
133-
# entry_class: co.decodable.examples.cpdemo.TableAPIJob
128+
job_file_path: build/libs/custom-pipelines-hello-world-0.3-all.jar
134129

130+
# Select which job to run from the JAR file
131+
# NOTE: when running the keyed append stream or change stream based variants of the example jobs,
132+
# you'd need to redefine both purchase order stream resource definitions above to reflect this. Also, you'd
133+
# need to either feed the source stream from a CDC source or use another pipeline for pre-processing
134+
# since the REST resource used in the example currently only supports ingestion into unkeyed append streams.
135+
entry_class: co.decodable.examples.cpdemo.KeylessAppendStreamJob
136+
135137
properties:
136138
flink_version: 1.19-java11
137139
execution:

examples/custom-pipelines-hello-world/pom.xml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
<groupId>co.decodable.examples</groupId>
1616
<artifactId>custom-pipelines-hello-world</artifactId>
17-
<version>0.2</version>
17+
<version>0.3</version>
1818
<packaging>jar</packaging>
1919

2020
<name>Decodable Pipeline SDK Example</name>
@@ -25,7 +25,7 @@
2525
<scala.binary.version>2.12</scala.binary.version>
2626
<maven.compiler.source>${target.java.version}</maven.compiler.source>
2727
<maven.compiler.target>${target.java.version}</maven.compiler.target>
28-
<decodable.pipeline.sdk.version>1.19.2-1.0.0.Beta9</decodable.pipeline.sdk.version>
28+
<decodable.pipeline.sdk.version>1.19.2-1.0.0-SNAPSHOT</decodable.pipeline.sdk.version>
2929
<flink.kafka.connector.version>3.3.0-1.19</flink.kafka.connector.version>
3030
<aws.msk.iam.auth.version>1.1.6</aws.msk.iam.auth.version>
3131
<flink.version>1.19.2</flink.version>
@@ -162,7 +162,7 @@
162162
<plugin>
163163
<groupId>org.apache.maven.plugins</groupId>
164164
<artifactId>maven-shade-plugin</artifactId>
165-
<version>3.1.1</version>
165+
<version>3.5.3</version>
166166
<executions>
167167
<!-- Run shade goal on package phase -->
168168
<execution>
@@ -179,10 +179,16 @@
179179
<exclude>org.xerial.snappy:snappy-java</exclude>
180180
</excludes>
181181
</artifactSet>
182+
<relocations>
183+
<relocation>
184+
<pattern>org.apache.kafka.clients</pattern>
185+
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka.clients</shadedPattern>
186+
</relocation>
187+
</relocations>
182188
<transformers>
183189
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
184190
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
185-
<mainClass>co.decodable.examples.cpdemo.DataStreamJob</mainClass>
191+
<mainClass>co.decodable.examples.cpdemo.KeylessAppendStreamJob</mainClass>
186192
</transformer>
187193
</transformers>
188194
</configuration>
@@ -197,6 +203,9 @@
197203
<groupId>org.apache.maven.plugins</groupId>
198204
<artifactId>maven-surefire-plugin</artifactId>
199205
<version>3.1.2</version>
206+
<configuration>
207+
<argLine>-Xmx1024m</argLine>
208+
</configuration>
200209
</plugin>
201210
</plugins>
202211
</pluginManagement>
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* Copyright Decodable, Inc.
5+
*
6+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
7+
*/
8+
package co.decodable.examples.cpdemo;
9+
10+
import co.decodable.examples.cpdemo.model.OrderKey;
11+
import co.decodable.examples.cpdemo.model.PurchaseOrder;
12+
import co.decodable.examples.cpdemo.model.change.KeyedPurchaseOrder;
13+
import co.decodable.examples.cpdemo.model.change.PurchaseOrderEnvelope;
14+
import co.decodable.sdk.pipeline.DecodableStreamSink;
15+
import co.decodable.sdk.pipeline.DecodableStreamSource;
16+
import co.decodable.sdk.pipeline.metadata.SinkStreams;
17+
import co.decodable.sdk.pipeline.metadata.SourceStreams;
18+
import co.decodable.sdk.pipeline.serde.DecodableRecordDeserializationSchema;
19+
import co.decodable.sdk.pipeline.serde.DecodableRecordSerializationSchema;
20+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
21+
import org.apache.flink.api.common.functions.RichMapFunction;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.metrics.Counter;
24+
import org.apache.flink.metrics.SimpleCounter;
25+
import org.apache.flink.streaming.api.datastream.DataStream;
26+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
27+
28+
import static co.decodable.examples.cpdemo.ChangeStreamJob.PURCHASE_ORDERS_PROCESSED_STREAM;
29+
import static co.decodable.examples.cpdemo.ChangeStreamJob.PURCHASE_ORDERS_STREAM;
30+
31+
// spotless:off
32+
@SourceStreams(PURCHASE_ORDERS_STREAM) // @start region="custom-pipeline"
33+
@SinkStreams(PURCHASE_ORDERS_PROCESSED_STREAM)
34+
public class ChangeStreamJob {
35+
36+
static final String PURCHASE_ORDERS_STREAM = "purchase-orders";
37+
static final String PURCHASE_ORDERS_PROCESSED_STREAM = "purchase-orders-processed";
38+
39+
public static void main(String[] args) throws Exception {
40+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
41+
42+
// @highlight region regex=".*"
43+
DecodableStreamSource<KeyedPurchaseOrder> source = DecodableStreamSource.<KeyedPurchaseOrder>builder()
44+
.withStreamName(PURCHASE_ORDERS_STREAM)
45+
.withRecordDeserializationSchema(new DecodableRecordDeserializationSchema<>(KeyedPurchaseOrder.class))
46+
.build();
47+
48+
DecodableStreamSink<KeyedPurchaseOrder> sink = DecodableStreamSink.<KeyedPurchaseOrder>builder()
49+
.withStreamName(PURCHASE_ORDERS_PROCESSED_STREAM)
50+
.withRecordSerializationSchema(new DecodableRecordSerializationSchema<>(OrderKey.class, PurchaseOrder.class))
51+
.build();
52+
// @end
53+
54+
DataStream<KeyedPurchaseOrder> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
55+
PURCHASE_ORDERS_STREAM)
56+
.map(new PurchaseOrderProcessor());
57+
58+
stream.sinkTo(sink).name(PURCHASE_ORDERS_PROCESSED_STREAM);
59+
60+
env.execute("purchase order processor with change streams");
61+
} // @end region="custom-pipeline"
62+
63+
public static class PurchaseOrderProcessor extends RichMapFunction<KeyedPurchaseOrder, KeyedPurchaseOrder> {
64+
65+
private static final long serialVersionUID = 1L;
66+
private Counter recordsProcessed;
67+
68+
// @start region="metric-group"
69+
@Override
70+
public void open(Configuration parameters) throws Exception {
71+
recordsProcessed = getRuntimeContext()
72+
.getMetricGroup()
73+
.addGroup("DecodableMetrics")
74+
.counter("recordsProcessed", new SimpleCounter());
75+
}
76+
// @end region="metric-group"
77+
78+
@Override
79+
public KeyedPurchaseOrder map(KeyedPurchaseOrder original) throws Exception {
80+
var envelope = original.getValue();
81+
var processed = new KeyedPurchaseOrder(
82+
original.getKey(),
83+
new PurchaseOrderEnvelope(
84+
// NOTE:
85+
// This can only work for truly stateless transformations otherwise explicit
86+
// state per key handling is necessary to retrieve the before state accordingly.
87+
processOrder(envelope.getBefore()),
88+
processOrder(envelope.getAfter()),
89+
envelope.getOp(),
90+
envelope.getTs_ms()
91+
)
92+
);
93+
recordsProcessed.inc();
94+
return processed;
95+
}
96+
97+
private static PurchaseOrder processOrder(PurchaseOrder original) {
98+
return original == null ? null : new PurchaseOrder(
99+
original.orderId,
100+
original.orderDate,
101+
original.customerName.toUpperCase(),
102+
original.price,
103+
original.productId,
104+
original.orderStatus
105+
);
106+
}
107+
}
108+
}
109+
//spotless:on
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* Copyright Decodable, Inc.
5+
*
6+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
7+
*/
8+
package co.decodable.examples.cpdemo;
9+
10+
import co.decodable.examples.cpdemo.model.OrderKey;
11+
import co.decodable.examples.cpdemo.model.PurchaseOrder;
12+
import co.decodable.examples.cpdemo.model.append.KeyedPurchaseOrder;
13+
import co.decodable.examples.cpdemo.model.append.KeylessPurchaseOrder;
14+
import co.decodable.sdk.pipeline.DecodableStreamSink;
15+
import co.decodable.sdk.pipeline.DecodableStreamSource;
16+
import co.decodable.sdk.pipeline.metadata.SinkStreams;
17+
import co.decodable.sdk.pipeline.metadata.SourceStreams;
18+
import co.decodable.sdk.pipeline.serde.DecodableRecordDeserializationSchema;
19+
import co.decodable.sdk.pipeline.serde.DecodableRecordSerializationSchema;
20+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
21+
import org.apache.flink.api.common.functions.RichMapFunction;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.metrics.Counter;
24+
import org.apache.flink.metrics.SimpleCounter;
25+
import org.apache.flink.streaming.api.datastream.DataStream;
26+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
27+
28+
import static co.decodable.examples.cpdemo.KeylessAppendStreamJob.PURCHASE_ORDERS_PROCESSED_STREAM;
29+
import static co.decodable.examples.cpdemo.KeylessAppendStreamJob.PURCHASE_ORDERS_STREAM;
30+
31+
@SourceStreams(PURCHASE_ORDERS_STREAM)
32+
@SinkStreams(PURCHASE_ORDERS_PROCESSED_STREAM)
33+
public class Keyed2KeylessAppendStreamJob {
34+
35+
static final String PURCHASE_ORDERS_PROCESSED_STREAM = "purchase-orders-processed";
36+
static final String PURCHASE_ORDERS_STREAM = "purchase-orders";
37+
38+
public static void main(String[] args) throws Exception {
39+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
40+
41+
DecodableStreamSource<KeyedPurchaseOrder> source =
42+
DecodableStreamSource.<KeyedPurchaseOrder>builder()
43+
.withStreamName(PURCHASE_ORDERS_STREAM)
44+
.withRecordDeserializationSchema(new DecodableRecordDeserializationSchema<>(KeyedPurchaseOrder.class))
45+
.build();
46+
47+
DecodableStreamSink<KeylessPurchaseOrder> sink =
48+
DecodableStreamSink.<KeylessPurchaseOrder>builder()
49+
.withStreamName(PURCHASE_ORDERS_PROCESSED_STREAM)
50+
.withRecordSerializationSchema(new DecodableRecordSerializationSchema<>(PurchaseOrder.class))
51+
.build();
52+
53+
DataStream<KeylessPurchaseOrder> stream =
54+
env.fromSource(source, WatermarkStrategy.noWatermarks(),
55+
PURCHASE_ORDERS_STREAM)
56+
.map(new NameConverter());
57+
58+
stream.sinkTo(sink)
59+
.name(PURCHASE_ORDERS_PROCESSED_STREAM);
60+
61+
env.execute("purchase order processor reading keyed/writing keyless append stream");
62+
}
63+
64+
public static class NameConverter extends RichMapFunction<KeyedPurchaseOrder, KeylessPurchaseOrder> {
65+
66+
private static final long serialVersionUID = 1L;
67+
68+
private Counter recordsProcessed;
69+
70+
@Override
71+
public void open(Configuration parameters) throws Exception {
72+
recordsProcessed = getRuntimeContext()
73+
.getMetricGroup()
74+
.addGroup("DecodableMetrics")
75+
.counter("recordsProcessed", new SimpleCounter());
76+
}
77+
78+
@Override
79+
public KeylessPurchaseOrder map(KeyedPurchaseOrder order) throws Exception {
80+
var orderValue = order.getValue();
81+
var newOrder = new PurchaseOrder(
82+
orderValue.orderId,
83+
orderValue.orderDate,
84+
orderValue.customerName.toUpperCase(),
85+
orderValue.price,
86+
orderValue.productId,
87+
orderValue.orderStatus
88+
);
89+
recordsProcessed.inc();
90+
return new KeylessPurchaseOrder(newOrder);
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)