Skip to content

Commit 03f1181

Browse files
authored
Upgrade to Flink 2.0 (#10)
1 parent 87e87a8 commit 03f1181

File tree

56 files changed

+2913
-1436
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2913
-1436
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
runs-on: ubuntu-latest
2424
strategy:
2525
matrix:
26-
flink: [ "1.17.2", "1.18.1", "1.19.0" ]
26+
flink: [ "2.0.0" ]
2727
jdk: [ "11" ]
2828
steps:
2929
- uses: actions/checkout@v3

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## [Unreleased]
44

5+
- Upgrade to Flink 2.0
6+
57
## [0.2.1] - 2024-04-11
68

79
## [0.2.0] - 2023-11-22

README.md

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,148 @@
11
# flink-connector-jdbc-elasticsearch-dialect
2+
3+
This module contains Flink JDBC dialect for Elasticsearch.
4+
5+
## Elasticsearch Catalog
6+
7+
This is an implementation of
8+
a [Flink Catalog](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/catalogs/)
9+
for [Elastic](https://www.elastic.co/).
10+
11+
---
12+
13+
### Possible Operations
14+
15+
- `listDatabases` Lists Databases in a catalog.
16+
- `databaseExists` Checks if a database exists.
17+
- `listTables` Lists Tables in a Database.
18+
- `tableExists` Checks if a table exists.
19+
- `getTable` Gets the metadata information about the table. This consists of table schema and table properties. Table
20+
properties among others contain `CONNECTOR`, `BASE_URL`, `TABLE_NAME` and `SCAN_PARTITION` options.
21+
22+
---
23+
24+
### Scan options
25+
26+
If we want tables in a catalog to be partitioned by a column we should specify scan options.
27+
It is possible to set
28+
up [Scan options](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/jdbc/#scan-partition-column:~:text=than%201%20second.-,scan.partition.column,-optional)
29+
while defining a catalog.
30+
31+
There are 2 types of scan options for Elastic Catalog:
32+
33+
#### Default scan options for a catalog
34+
35+
We can specify default partitioning options for all tables in a catalog. If no options for a table are specified, these
36+
options will be used to
37+
select a column for partitioning and the number of partitions for a table will be calculated based on catalog default
38+
option.
39+
40+
- `catalog.default.scan.partition.column.name` Specify what column to use for table partitioning by default. The default
41+
option will be used
42+
for all tables in a catalog. We can overwrite a column to use for partitioning of a table by specifying table specific
43+
scan options.
44+
- `catalog.default.scan.partition.size` Specify how many elements should be placed in a single partition. The number of
45+
partitions will be calculated based on the number of elements and the default size of a partition. If we want a
46+
particular table
47+
to have an exact number of partitions, we can specify that number using table specific scan options.
48+
49+
#### Table specific scan options
50+
51+
These options can be useful if we know that not all tables in a catalog should be partitioned in the same way. Here
52+
we can specify partitioning options for selected tables.
53+
54+
- `properties.scan.{tablename}.partition.column.name` Specify the name of the column to use for partitioning of a table.
55+
Corresponds to the `scan.partition.column` option.
56+
- `properties.scan.{tablename}.partition.number` Specify the number of partitions for a table. Corresponds to the
57+
`scan.partition.num` option.
58+
59+
For both of options specified above we should replace `{tablename}` with the name of the table that we want the options
60+
to apply to.
61+
We can provide these options for multiple tables.
62+
63+
#### Index patterns
64+
65+
If we specify an index pattern, a Flink table will be created in Catalog that instead of targeting a single index in
66+
Elastic will target all indexes that match
67+
the pattern provided. It is useful to use if we want to write Flink SQL that reads similar data from many similar tables
68+
instead of a single one.
69+
The resulting Flink table will contain all columns found in matching tables and will use all the data from matching
70+
tables.
71+
This table will have the same name as the pattern.
72+
73+
- `properties.index.patterns` Specify patterns for which we want to create Flink tables. We can specify multiple index
74+
patterns by
75+
separating them with a comma `,` sign.
76+
77+
The Flink tables created this way can also be partitioned just as other Flink tables by providing default catalog scan
78+
options or table specific scan options.
79+
80+
#### Time attributes
81+
82+
It is possible to add `proctime` column to each catalog table.
83+
84+
```properties
85+
catalog.add-proctime-column=true
86+
```
87+
88+
---
89+
90+
### Rules for overwriting catalog scan options
91+
92+
#### No scan options were provided
93+
94+
There is no necessity to provide either default scan options for a catalog or table specific scan options. If there are
95+
no scan options provided
96+
no tables in a catalog will be partitioned.
97+
98+
#### Only default scan options for a catalog were provided
99+
100+
If only default catalog scan options were provided, all tables in a catalog will be partitioned in a similar way. The
101+
same column name for table partitioning for all tables and
102+
the number of partitions for tables will be dependant on the number of records in a table. All tables will have the same
103+
maximum number of elements in a partition.
104+
105+
#### Only table specific scan options were provided
106+
107+
If we want a specific table to be partitioned and leave the rest of tables nonpartitioned we have to provide both table
108+
specific scan options.
109+
110+
#### We specified both catalog default scan options and table specific scan were options
111+
112+
Table specific scan options have higher priority over catalog default scan properties when deciding how to partition a
113+
table.
114+
If we specify catalog default partition column name and a table specific partition column name then table specific
115+
partition column name is taken into account.
116+
Similar thing happens when we specify catalog default scan partition size and table specific partition number. Instead
117+
of calculating the number of partitions for a table
118+
based on the count of elements, the table will have the number of partitions equal to the one provided for a table.
119+
120+
---
121+
122+
### Calculation of scan partition bounds
123+
124+
If a table is partitioned, meaning that we specified catalog default scan options or we specified table specific scan
125+
options the upper and lower bounds will be calculated.
126+
As specified in the Flink documentation, the `properties.scan.{tablename}.partition.column.name` option works for
127+
numeric and temporal data types.
128+
The `scan.partition.lower-bound` will be calculated as the lowest value in the table.
129+
The `scan.partition.upper-bound` will be calculated as the highest value in the table.
130+
131+
---
132+
133+
### Note that
134+
135+
If we want a table to be partitioned it is necessary that we provide a catalog default or table specific option for
136+
partition column to use and
137+
catalog default or table specific partition number option for deciding how many partitions to use for a table.
138+
If only 1 option is provided we will receive an error.
139+
140+
---
141+
142+
### Implementation details
143+
144+
`com.getindata.flink.connector.jdbc.elasticsearch.database.catalog.ElasticsearchJdbcCatalogFactory` - has been copied
145+
because default CatalogFactory does not allow to pass custom catalog properties.
146+
147+
`com.getindata.flink.connector.jdbc.elasticsearch.database.catalog.CopiedAbstractJdbcCatalog` is a copy of
148+
`org.apache.flink.connector.jdbc.core.database.catalog.AbstractJdbcCatalog` where JDBC validation is modified.

pom.xml

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@
4141
<scala.binary.version>2.12</scala.binary.version>
4242
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4343

44-
<elasticsearch.version>8.11.1</elasticsearch.version>
45-
<flink.version>1.18.1</flink.version>
46-
<flink-connector-jdbc.version>3.1.2-1.18</flink-connector-jdbc.version>
44+
<elasticsearch.version>8.19.0</elasticsearch.version>
45+
<flink.version>2.0.0</flink.version>
46+
<flink-connector-jdbc.version>4.0.0-2.0</flink-connector-jdbc.version>
4747
<jackson.version>2.15.2</jackson.version>
4848

4949
<!-- Test -->
@@ -60,6 +60,7 @@
6060
<junit.version>5.9.1</junit.version>
6161
<logback.version>1.3.5</logback.version>
6262
<mockito.version>2.21.0</mockito.version>
63+
<okhttp.version>4.10.0</okhttp.version>
6364
<slf4j-api.version>2.0.4</slf4j-api.version>
6465
<testcontainers.version>1.18.2</testcontainers.version>
6566
</properties>
@@ -192,7 +193,13 @@
192193
</dependency>
193194
<dependency>
194195
<groupId>org.apache.flink</groupId>
195-
<artifactId>flink-connector-jdbc</artifactId>
196+
<artifactId>flink-table-api-java</artifactId>
197+
<version>${flink.version}</version>
198+
<scope>provided</scope>
199+
</dependency>
200+
<dependency>
201+
<groupId>org.apache.flink</groupId>
202+
<artifactId>flink-connector-jdbc-core</artifactId>
196203
<version>${flink-connector-jdbc.version}</version>
197204
</dependency>
198205

@@ -204,7 +211,21 @@
204211
<scope>provided</scope>
205212
</dependency>
206213

214+
<!-- Logging dependencies -->
215+
<dependency>
216+
<groupId>org.slf4j</groupId>
217+
<artifactId>slf4j-api</artifactId>
218+
<version>${slf4j-api.version}</version>
219+
</dependency>
220+
207221
<!-- Test dependencies -->
222+
<dependency>
223+
<groupId>org.apache.flink</groupId>
224+
<artifactId>flink-connector-jdbc-core</artifactId>
225+
<version>${flink-connector-jdbc.version}</version>
226+
<type>test-jar</type>
227+
<scope>test</scope>
228+
</dependency>
208229
<dependency>
209230
<groupId>org.junit</groupId>
210231
<artifactId>junit-bom</artifactId>
@@ -251,12 +272,6 @@
251272
<version>${logback.version}</version>
252273
<scope>test</scope>
253274
</dependency>
254-
<dependency>
255-
<groupId>org.slf4j</groupId>
256-
<artifactId>slf4j-api</artifactId>
257-
<version>${slf4j-api.version}</version>
258-
<scope>test</scope>
259-
</dependency>
260275
<dependency>
261276
<groupId>ch.qos.logback</groupId>
262277
<artifactId>logback-classic</artifactId>
@@ -343,6 +358,12 @@
343358
<version>${jackson.version}</version>
344359
<scope>test</scope>
345360
</dependency>
361+
<dependency>
362+
<groupId>com.squareup.okhttp3</groupId>
363+
<artifactId>okhttp</artifactId>
364+
<version>${okhttp.version}</version>
365+
<scope>test</scope>
366+
</dependency>
346367
</dependencies>
347368

348369
<profiles>
Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,34 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.getindata.flink.connector.jdbc.databases.elasticsearch.dialect;
19+
package com.getindata.flink.connector.jdbc.elasticsearch.database;
2020

21-
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
22-
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
21+
import com.getindata.flink.connector.jdbc.elasticsearch.database.dialect.ElasticsearchDialect;
22+
import org.apache.flink.connector.jdbc.core.database.JdbcFactory;
23+
import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
24+
import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
2325

24-
public class ElasticsearchDialectFactory implements JdbcDialectFactory {
26+
public class ElasticsearchFactory implements JdbcFactory {
2527

2628
@Override
2729
public boolean acceptsURL(String url) {
2830
return url.startsWith("jdbc:elasticsearch:") || url.startsWith("jdbc:es:");
2931
}
3032

3133
@Override
32-
public JdbcDialect create() {
34+
public JdbcDialect createDialect() {
3335
return new ElasticsearchDialect();
3436
}
37+
38+
@Override
39+
public JdbcCatalog createCatalog(
40+
ClassLoader classLoader,
41+
String catalogName,
42+
String defaultDatabase,
43+
String username,
44+
String pwd,
45+
String baseUrl) {
46+
throw new UnsupportedOperationException("Use elasticsearch catalog factory.");
47+
}
48+
3549
}

0 commit comments

Comments
 (0)