This module contains Flink JDBC dialect for Elasticsearch.
This is an implementation of a Flink Catalog for Elastic.
- listDatabasesLists Databases in a catalog.
- databaseExistsChecks if a database exists.
- listTablesLists Tables in a Database.
- tableExistsChecks if a table exists.
- getTableGets the metadata information about the table. This consists of table schema and table properties. Table properties among others contain- CONNECTOR,- BASE_URL,- TABLE_NAMEand- SCAN_PARTITIONoptions.
If we want tables in a catalog to be partitioned by a column we should specify scan options. It is possible to set up Scan options while defining a catalog.
There are 2 types of scan options for Elastic Catalog:
We can specify default partitioning options for all tables in a catalog. If no options for a table are specified, these options will be used to select a column for partitioning and the number of partitions for a table will be calculated based on catalog default option.
- catalog.default.scan.partition.column.nameSpecify what column to use for table partitioning by default. The default option will be used for all tables in a catalog. We can overwrite a column to use for partitioning of a table by specifying table specific scan options.
- catalog.default.scan.partition.sizeSpecify how many elements should be placed in a single partition. The number of partitions will be calculated based on the number of elements and the default size of a partition. If we want a particular table to have an exact number of partitions, we can specify that number using table specific scan options.
These options can be useful if we know that not all tables in a catalog should be partitioned in the same way. Here we can specify partitioning options for selected tables.
- properties.scan.{tablename}.partition.column.nameSpecify the name of the column to use for partitioning of a table. Corresponds to the- scan.partition.columnoption.
- properties.scan.{tablename}.partition.numberSpecify the number of partitions for a table. Corresponds to the- scan.partition.numoption.
For both of options specified above we should replace {tablename} with the name of the table that we want the options
to apply to.
We can provide these options for multiple tables.
If we specify an index pattern, a Flink table will be created in Catalog that instead of targeting a single index in Elastic will target all indexes that match the pattern provided. It is useful to use if we want to write Flink SQL that reads similar data from many similar tables instead of a single one. The resulting Flink table will contain all columns found in matching tables and will use all the data from matching tables. This table will have the same name as the pattern.
- properties.index.patternsSpecify patterns for which we want to create Flink tables. We can specify multiple index patterns by separating them with a comma- ,sign.
The Flink tables created this way can also be partitioned just as other Flink tables by providing default catalog scan options or table specific scan options.
It is possible to add proctime column to each catalog table.
catalog.add-proctime-column=trueThere is no necessity to provide either default scan options for a catalog or table specific scan options. If there are no scan options provided no tables in a catalog will be partitioned.
If only default catalog scan options were provided, all tables in a catalog will be partitioned in a similar way. The same column name for table partitioning for all tables and the number of partitions for tables will be dependant on the number of records in a table. All tables will have the same maximum number of elements in a partition.
If we want a specific table to be partitioned and leave the rest of tables nonpartitioned we have to provide both table specific scan options.
Table specific scan options have higher priority over catalog default scan properties when deciding how to partition a table. If we specify catalog default partition column name and a table specific partition column name then table specific partition column name is taken into account. Similar thing happens when we specify catalog default scan partition size and table specific partition number. Instead of calculating the number of partitions for a table based on the count of elements, the table will have the number of partitions equal to the one provided for a table.
If a table is partitioned, meaning that we specified catalog default scan options or we specified table specific scan
options the upper and lower bounds will be calculated.
As specified in the Flink documentation, the properties.scan.{tablename}.partition.column.name option works for
numeric and temporal data types.
The scan.partition.lower-bound will be calculated as the lowest value in the table.
The scan.partition.upper-bound will be calculated as the highest value in the table.
If we want a table to be partitioned it is necessary that we provide a catalog default or table specific option for partition column to use and catalog default or table specific partition number option for deciding how many partitions to use for a table. If only 1 option is provided we will receive an error.
com.getindata.flink.connector.jdbc.elasticsearch.database.catalog.ElasticsearchJdbcCatalogFactory - has been copied
because default CatalogFactory does not allow to pass custom catalog properties.
com.getindata.flink.connector.jdbc.elasticsearch.database.catalog.CopiedAbstractJdbcCatalog is a copy of
org.apache.flink.connector.jdbc.core.database.catalog.AbstractJdbcCatalog where JDBC validation is modified.