-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
17 changed files
with
1,183 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -244,7 +244,7 @@ services: | |
working_dir: /opt/flink | ||
ports: | ||
- "8081:8081" | ||
- "8082:8081" | ||
- "6123:6123" | ||
|
||
environment: | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
### Java ### | ||
# Compiled class file | ||
*.class | ||
|
||
# Log file | ||
*.log | ||
|
||
# BlueJ files | ||
*.ctxt | ||
|
||
# Mobile Tools for Java (J2ME) | ||
.mtj.tmp/ | ||
|
||
# Package Files # | ||
*.jar | ||
*.war | ||
*.nar | ||
*.ear | ||
*.zip | ||
*.tar.gz | ||
*.rar | ||
|
||
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml | ||
hs_err_pid* | ||
replay_pid* | ||
|
||
### Maven ### | ||
target/ | ||
pom.xml.tag | ||
pom.xml.releaseBackup | ||
pom.xml.versionsBackup | ||
pom.xml.next | ||
release.properties | ||
dependency-reduced-pom.xml | ||
buildNumber.properties | ||
.mvn/timing.properties | ||
# https://github.com/takari/maven-wrapper#usage-without-binary-jar | ||
.mvn/wrapper/maven-wrapper.jar | ||
|
||
# Eclipse m2e generated files | ||
# Eclipse Core | ||
.project | ||
# JDT-specific (Eclipse Java Development Tools) | ||
.classpath |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<parent> | ||
<groupId>com.github.lambda.lakehouse</groupId> | ||
<artifactId>project-flink</artifactId> | ||
<version>1.0-SNAPSHOT</version> | ||
<relativePath>../pom.xml</relativePath> | ||
</parent> | ||
|
||
<artifactId>flink-sql-iceberg</artifactId> | ||
|
||
<properties> | ||
</properties> | ||
|
||
<dependencies> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-json</artifactId> | ||
<version>${dep.version.flink}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-streaming-java</artifactId> | ||
<version>${dep.version.flink}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-api-java</artifactId> | ||
<version>${dep.version.flink}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-table-planner_2.12</artifactId> | ||
<version>${dep.version.flink}</version> | ||
</dependency> | ||
|
||
<!-- Parquet Dependency --> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connector-files</artifactId> | ||
<version>${dep.version.flink}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-parquet</artifactId> | ||
<version>${dep.version.flink}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-s3-fs-hadoop</artifactId> | ||
<version>${dep.version.flink}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<!-- Kafka Dependency --> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connector-kafka</artifactId> | ||
<version>${dep.version.flink}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connector-kafka</artifactId> | ||
<version>${dep.version.flink}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-sql-connector-kafka</artifactId> | ||
<version>${dep.version.flink}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.kafka</groupId> | ||
<artifactId>kafka-clients</artifactId> | ||
<version>${dep.version.kafka}</version> | ||
</dependency> | ||
|
||
<!-- Iceberg Dependency --> | ||
<dependency> | ||
<groupId>org.apache.iceberg</groupId> | ||
<artifactId>iceberg-flink-runtime-${dep.version.flinkShort}</artifactId> | ||
<version>${dep.version.iceberg}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.iceberg</groupId> | ||
<artifactId>iceberg-hive-runtime</artifactId> | ||
<version>${dep.version.iceberg}</version> | ||
</dependency> | ||
|
||
<!-- Extra Dependency --> | ||
<dependency> | ||
<groupId>software.amazon.awssdk</groupId> | ||
<artifactId>bundle</artifactId> | ||
<version>${dep.version.awssdk}</version> | ||
</dependency> | ||
|
||
<!-- Hadoop Dependency --> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-mapreduce-client-core</artifactId> | ||
<version>${dep.version.hadoop}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>com.sun.jersey</groupId> | ||
<artifactId>jersey-core</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-mapreduce-client-common</artifactId> | ||
<version>${dep.version.hadoop}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-mapreduce-client-jobclient</artifactId> | ||
<version>${dep.version.hadoop}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-hdfs</artifactId> | ||
<version>${dep.version.hadoop}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>com.sun.jersey</groupId> | ||
<artifactId>jersey-core</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-common</artifactId> | ||
<version>${dep.version.hadoop}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>com.sun.jersey</groupId> | ||
<artifactId>jersey-core</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-aws</artifactId> | ||
<version>${dep.version.hadoop}</version> | ||
</dependency> | ||
|
||
<!-- Hive Dependency --> | ||
<dependency> | ||
<groupId>org.apache.flink</groupId> | ||
<artifactId>flink-connector-hive_2.12</artifactId> | ||
<version>${dep.version.flink}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.hive</groupId> | ||
<artifactId>hive-exec</artifactId> | ||
<version>${dep.version.hive}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>org.apache.orc</groupId> | ||
<artifactId>orc-core</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
|
||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<!-- The maven-shade plugin creates a fat jar that contains all | ||
dependencies. --> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-shade-plugin</artifactId> | ||
<version>3.4.1</version> | ||
<configuration> | ||
<shadedArtifactAttached>true</shadedArtifactAttached> | ||
<shadedClassifierName>combined</shadedClassifierName> | ||
</configuration> | ||
<executions> | ||
<execution> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>shade</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-compiler-plugin</artifactId> | ||
<configuration> | ||
<source>8</source> | ||
<target>8</target> | ||
</configuration> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
</project> |
98 changes: 98 additions & 0 deletions
98
...k/flink-sql-iceberg/src/main/java/com/github/lambda/lakehouse/FlinkAppKafkaToIceberg.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package com.github.lambda.lakehouse; | ||
|
||
import org.apache.flink.configuration.Configuration; | ||
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; | ||
import org.apache.flink.streaming.api.CheckpointingMode; | ||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | ||
import org.apache.flink.table.api.Table; | ||
import org.apache.flink.table.api.TableEnvironment; | ||
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class FlinkAppKafkaToIceberg { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(FlinkAppKafkaToIceberg.class); | ||
|
||
public static void main(String[] args) throws Exception { | ||
|
||
TableEnvironment tableEnv = buildTableEnvironment(); | ||
|
||
Table tableRawCustomers = buildSourceTable("raw_customers", tableEnv); | ||
Table tableAggrCustomers = buildSinkTable("inventory", "sink_customers", tableEnv); | ||
|
||
tableEnv.executeSql("INSERT INTO sink_customers SELECT id, weight FROM raw_customers"); | ||
} | ||
|
||
public static Table buildSinkTable(String dbName, String tableName, TableEnvironment tableEnv) { | ||
String query = "" | ||
+ "CREATE TABLE " + dbName + "." + tableName + " (\n" | ||
+ " id BIGINT,\n" | ||
+ " weight DECIMAL(38, 10),\n" | ||
+ " PRIMARY KEY (id) NOT ENFORCED\n" | ||
+ ") " | ||
+ "WITH (\n" | ||
+ " 'connector' = 'iceberg',\n" | ||
+ " 'catalog-name' = 'hive_prod',\n" | ||
+ " 'catalog-database' = '" + dbName + "',\n" | ||
+ " 'catalog-table' = 'customers',\n" | ||
+ " 'uri' = 'thrift://localhost:9083',\n" | ||
+ " 'warehouse' = 's3a://datalake'\n" | ||
+ ");\n"; | ||
tableEnv.executeSql(query); | ||
tableEnv.executeSql("SHOW CREATE TABLE " + dbName + "." + tableName).print(); | ||
|
||
Table table = tableEnv.from(dbName + "." + tableName); | ||
|
||
return table; | ||
} | ||
|
||
public static Table buildSourceTable(String tableName, TableEnvironment tableEnv) { | ||
String query = "" | ||
+ "CREATE TABLE " + tableName + " (\n" | ||
+ " origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,\n" | ||
+ " event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,\n" | ||
+ " origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,\n" | ||
+ " origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,\n" | ||
+ " origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,\n" | ||
+ " origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,\n" | ||
+ " id BIGINT,\n" | ||
+ " name STRING,\n" | ||
+ " description STRING,\n" | ||
+ " weight DECIMAL(38, 10)\n" + ") " | ||
+ "WITH (\n" | ||
+ " 'connector' = 'kafka',\n" | ||
+ " 'topic' = 'cdc-json.inventory.data.inventory.customers',\n" | ||
+ " 'properties.bootstrap.servers' = 'localhost:9092',\n" | ||
+ " 'properties.group.id' = 'testGroup',\n" | ||
+ " 'properties.auto.offset.reset' = 'earliest',\n" | ||
+ " 'scan.startup.mode' = 'earliest-offset',\n" | ||
+ " 'format' = 'debezium-json',\n" | ||
+ " 'debezium-json.schema-include' = 'true',\n" | ||
+ " 'debezium-json.ignore-parse-errors' = 'false'\n" | ||
+ ");\n"; | ||
tableEnv.executeSql(query); | ||
tableEnv.executeSql("SHOW CREATE TABLE " + tableName).print(); | ||
|
||
Table table = tableEnv.from(tableName); | ||
|
||
return table; | ||
} | ||
|
||
public static StreamTableEnvironment buildTableEnvironment() { | ||
// TODO (Kun): Handle Parameters | ||
// - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/ | ||
Configuration conf = new Configuration(); | ||
StreamExecutionEnvironment env = StreamExecutionEnvironment | ||
.createLocalEnvironmentWithWebUI(conf); | ||
env.getCheckpointConfig().setCheckpointInterval(10000L); | ||
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); | ||
// env.setStateBackend(new EmbeddedRocksDBStateBackend()); | ||
// env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoint"); | ||
// env.setDefaultSavepointDirectory("file:///tmp/flink-savepoint"); | ||
|
||
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); | ||
|
||
return tableEnv; | ||
} | ||
} |
Oops, something went wrong.