diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java index d8f270a8dbef6..653d15286203c 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/SqlClientTest.java @@ -214,6 +214,14 @@ void testExecuteSqlFile() throws Exception { } } + @Test + void testExecuteNexmark() throws Exception { + final URL sqlFile = getClass().getClassLoader().getResource("nexmark.sql"); + String[] args = new String[] {"-f", sqlFile.getPath()}; + String output = runSqlClient(args); + assertThat(output).doesNotContain("java.lang.AssertionError"); + } + @Test void testDisplayMultiLineSqlInInteractiveMode() throws Exception { List statements = diff --git a/flink-table/flink-sql-client/src/test/resources/nexmark.sql b/flink-table/flink-sql-client/src/test/resources/nexmark.sql new file mode 100644 index 0000000000000..5475f471dd9f8 --- /dev/null +++ b/flink-table/flink-sql-client/src/test/resources/nexmark.sql @@ -0,0 +1,116 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE TABLE datagen +( + event_type int, + person ROW< + id BIGINT, + name VARCHAR, + emailAddress VARCHAR, + creditCard VARCHAR, + city VARCHAR, + state VARCHAR, + `dateTime` TIMESTAMP(3), + extra VARCHAR>, + auction ROW< + id BIGINT, + itemName VARCHAR, + description VARCHAR, + initialBid BIGINT, + reserve BIGINT, + `dateTime` TIMESTAMP(3), + expires TIMESTAMP(3), + seller BIGINT, + category BIGINT, + extra VARCHAR>, + bid ROW< + auction BIGINT, + bidder BIGINT, + price BIGINT, + channel VARCHAR, + url VARCHAR, + `dateTime` TIMESTAMP(3), + extra VARCHAR>, + `dateTime` AS + CASE + WHEN event_type = 0 THEN person.`dateTime` + WHEN event_type = 1 THEN auction.`dateTime` + ELSE bid.`dateTime` + END, + WATERMARK FOR `dateTime` AS `dateTime` - INTERVAL '4' SECOND +) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' +); +CREATE VIEW person AS +SELECT person.id, + person.name, + person.emailAddress, + person.creditCard, + person.city, + person.state, + `dateTime`, + person.extra +FROM datagen +WHERE event_type = 0; + +CREATE VIEW auction AS +SELECT auction.id, + auction.itemName, + auction.description, + auction.initialBid, + auction.reserve, + `dateTime`, + auction.expires, + auction.seller, + auction.category, + auction.extra +FROM datagen +WHERE event_type = 1; + +CREATE VIEW bid AS +SELECT bid.auction, + bid.bidder, + bid.price, + bid.channel, + bid.url, + `dateTime`, + bid.extra +FROM datagen +WHERE event_type = 2; + + +CREATE TABLE nexmark_q7 +( + auction BIGINT, + bidder BIGINT, + price BIGINT, + `dateTime` TIMESTAMP(3), + extra VARCHAR +) WITH ( + 'connector' = 'blackhole' +); + +INSERT INTO nexmark_q7 +SELECT B.auction, B.price, B.bidder, B.`dateTime`, B.extra +from bid B + JOIN (SELECT MAX(price) AS maxprice, window_end as `dateTime` + FROM TABLE( + TUMBLE(TABLE bid, DESCRIPTOR(`dateTime`), INTERVAL '10' SECOND)) + GROUP BY window_start, window_end) B1 + ON B.price = B1.maxprice +WHERE B.`dateTime` BETWEEN B1.`dateTime` - INTERVAL '10' SECOND AND B1.`dateTime`; diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/EnvironmentReusableInMemoryCatalog.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/EnvironmentReusableInMemoryCatalog.java new file mode 100644 index 0000000000000..ae8efc1a3e746 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/EnvironmentReusableInMemoryCatalog.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.context; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.QueryOperationCatalogView; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; + +import java.util.Optional; + +/** + * An in-memory catalog that can be reused across different {@link TableEnvironment}. The SQL client + * works against {@link TableEnvironment} design and reuses some of the components (e.g. + * CatalogManager), but not all (e.g. Planner) which causes e.g. views registered in an in-memory + * catalog to fail. This class is a workaround not to keep Planner bound parts of a view reused + * across different {@link TableEnvironment}. + */ +public class EnvironmentReusableInMemoryCatalog extends GenericInMemoryCatalog { + public EnvironmentReusableInMemoryCatalog(String name, String defaultDatabase) { + super(name, defaultDatabase); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException { + CatalogBaseTable tableToRegister = + extractView(table) + .flatMap(QueryOperationCatalogView::getOriginalView) + .map(v -> (CatalogBaseTable) v) + .orElse(table); + super.createTable(tablePath, tableToRegister, ignoreIfExists); + } + + private Optional extractView(CatalogBaseTable table) { + if (table instanceof ResolvedCatalogView) { + final CatalogView origin = ((ResolvedCatalogView) table).getOrigin(); + if (origin instanceof QueryOperationCatalogView) { + return Optional.of((QueryOperationCatalogView) origin); + } + return Optional.empty(); + } else if (table instanceof QueryOperationCatalogView) { + return Optional.of((QueryOperationCatalogView) table); + } + return Optional.empty(); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java index 12bfc2d8e5a87..c0dfad56f9238 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java @@ -30,7 +30,6 @@ import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogStoreHolder; import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.factories.CatalogStoreFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.TableFactoryUtil; @@ -446,7 +445,7 @@ private static CatalogManager buildCatalogManager( catalogStore.config(), catalogStore.classLoader())) .orElse( - new GenericInMemoryCatalog( + new EnvironmentReusableInMemoryCatalog( defaultCatalogName, settings.getBuiltInDatabaseName())); } defaultCatalog.open(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java index 02e31d5112b7f..fd2b0a850c36f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/QueryOperationCatalogView.java @@ -116,4 +116,9 @@ public String getExpandedQuery() { public boolean supportsShowCreateView() { return originalView != null; } + + @Internal + public Optional getOriginalView() { + return Optional.ofNullable(originalView); + } }