diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 3af48d34b673..38d7390a733e 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -40,6 +40,43 @@ jackson-databind + + com.google.api + api-common + + + javax.annotation + javax.annotation-api + + + + + + com.google.apis + google-api-services-bigquery + v2-rev20250928-2.0.0 + + + + com.google.cloud + google-cloud-bigquery + + + com.google.guava + listenablefuture + + + javax.annotation + javax.annotation-api + + + + + + com.google.cloud + google-cloud-core + + com.google.errorprone error_prone_annotations @@ -117,6 +154,11 @@ trino-filesystem + + io.trino + trino-filesystem-gcs + + io.trino trino-filesystem-manager @@ -178,6 +220,11 @@ jakarta.annotation-api + + jakarta.inject + jakarta.inject-api + + jakarta.validation jakarta.validation-api @@ -220,6 +267,11 @@ iceberg-aws + + org.apache.iceberg + iceberg-bigquery + + org.apache.iceberg iceberg-core @@ -589,12 +641,6 @@ test - - io.trino.hadoop - hadoop-apache - test - - io.trino.hive hive-apache diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java index eacd42c99301..cb56eef61d0f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java @@ -22,5 +22,6 @@ public enum CatalogType JDBC, NESSIE, SNOWFLAKE, + BIGQUERY_METASTORE /**/; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 01ccaab618c9..2eb2aab85a47 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -49,6 +49,7 @@ import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer; import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats; import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.bigquery.TrinoBigQueryMetastoreCatalog; import io.trino.plugin.iceberg.functions.IcebergFunctionProvider; import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle; import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle; @@ -1304,7 +1305,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con Location location = Location.of(transaction.table().location()); try { // S3 Tables internally assigns a unique location for each table - if (!isS3Tables(location.toString())) { + if (!isS3Tables(location.toString()) && !(catalog instanceof TrinoBigQueryMetastoreCatalog)) { TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), transaction.table().io().properties()); if (!replace && fileSystem.listFiles(location).hasNext()) { throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("" + diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java index 7150be036ba4..1ef2726d4481 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java @@ -18,6 +18,7 @@ import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.iceberg.CatalogType; import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.bigquery.IcebergBigQueryMetastoreModule; import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule; import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; @@ -27,6 +28,7 @@ import io.trino.plugin.iceberg.catalog.snowflake.IcebergSnowflakeCatalogModule; import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static io.trino.plugin.iceberg.CatalogType.BIGQUERY_METASTORE; import static io.trino.plugin.iceberg.CatalogType.GLUE; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; import static io.trino.plugin.iceberg.CatalogType.JDBC; @@ -48,6 +50,7 @@ protected void setup(Binder binder) bindCatalogModule(JDBC, new IcebergJdbcCatalogModule()); bindCatalogModule(NESSIE, new IcebergNessieCatalogModule()); bindCatalogModule(SNOWFLAKE, new IcebergSnowflakeCatalogModule()); + bindCatalogModule(BIGQUERY_METASTORE, new IcebergBigQueryMetastoreModule()); } private void bindCatalogModule(CatalogType catalogType, Module module) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/BigQueryMetastoreIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/BigQueryMetastoreIcebergTableOperations.java new file mode 100644 index 000000000000..9cad1c5db496 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/BigQueryMetastoreIcebergTableOperations.java @@ -0,0 +1,253 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.bigquery; + +import com.google.api.pathtemplate.ValidationException; +import com.google.api.services.bigquery.model.ExternalCatalogTableOptions; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.common.collect.Maps; +import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreClientImpl; +import org.apache.iceberg.io.FileIO; + +import java.util.Locale; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.base.Verify.verify; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; + +public class BigQueryMetastoreIcebergTableOperations + extends AbstractIcebergTableOperations +{ + private static final String TABLE_PROPERTIES_BQ_CONNECTION = "bq_connection"; + private final BigQueryMetastoreClientImpl client; + private final String projectId; + private final String datasetId; + private final String tableId; + private final TableReference tableReference; + private final String tableName; + + public BigQueryMetastoreIcebergTableOperations( + BigQueryMetastoreClientImpl bqmsClient, + FileIO fileIO, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location, + String projectId) + { + super(fileIO, session, database, table, owner, location); + this.client = requireNonNull(bqmsClient, "BigQueryClient is null"); + this.projectId = requireNonNull(projectId, "projectId is null"); + this.datasetId = requireNonNull(database, "database is null"); + this.tableId = requireNonNull(table, "table is null"); + this.tableReference = new TableReference() + .setProjectId(projectId) + .setDatasetId(datasetId) + .setTableId(tableId); + this.tableName = String.format("%s.%s.%s", this.projectId, this.datasetId, this.tableId); + } + + //ok + private String loadMetadataLocationOrThrow(ExternalCatalogTableOptions tableOptions) + { + if (tableOptions == null || !tableOptions.getParameters().containsKey(METADATA_LOCATION_PROP)) { + throw new ValidationException( + "Table %s is not a valid BigQuery Metastore Iceberg table, metadata location not found", + this.tableName); + } + + return tableOptions.getParameters().get(METADATA_LOCATION_PROP); + } + + //ok + @Override + protected String getRefreshedLocation(boolean invalidateCaches) + { + try { + return loadMetadataLocationOrThrow(client.load(tableReference).getExternalCatalogTableOptions()); + } + catch (NoSuchTableException e) { + throw new TableNotFoundException(getSchemaTableName()); + } + } + + /** Adds Hive-style basic statistics from snapshot metadata if it exists. */ + private static void updateParametersWithSnapshotMetadata( + TableMetadata metadata, Map parameters) + { + if (metadata.currentSnapshot() == null) { + return; + } + + Map summary = metadata.currentSnapshot().summary(); + if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) { + parameters.put("numFiles", summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP)); + } + + if (summary.get(SnapshotSummary.TOTAL_RECORDS_PROP) != null) { + parameters.put("numRows", summary.get(SnapshotSummary.TOTAL_RECORDS_PROP)); + } + + if (summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP) != null) { + parameters.put("totalSize", summary.get(SnapshotSummary.TOTAL_FILE_SIZE_PROP)); + } + } + + private Map buildTableParameters( + String metadataFileLocation, TableMetadata metadata) + { + Map parameters = Maps.newHashMap(metadata.properties()); + if (metadata.uuid() != null) { + parameters.put(TableProperties.UUID, metadata.uuid()); + } + if (metadata.metadataFileLocation() != null && !metadata.metadataFileLocation().isEmpty()) { + parameters.put(PREVIOUS_METADATA_LOCATION_PROP, metadata.metadataFileLocation()); + } + parameters.put(METADATA_LOCATION_PROP, metadataFileLocation); + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE); + parameters.put("EXTERNAL", "TRUE"); + + updateParametersWithSnapshotMetadata(metadata, parameters); + return parameters; + } + + private com.google.api.services.bigquery.model.Table makeNewTable(TableMetadata metadata, String metadataFileLocation) + { + return new com.google.api.services.bigquery.model.Table() + .setExternalCatalogTableOptions( + BigQueryMetastoreIcebergUtil.createExternalCatalogTableOptions( + metadata.location(), buildTableParameters(metadataFileLocation, metadata))); + } + + private void addConnectionIfProvided(Table tableBuilder, Map metadataProperties) + { + if (metadataProperties.containsKey(TABLE_PROPERTIES_BQ_CONNECTION)) { + tableBuilder + .getExternalCatalogTableOptions() + .setConnectionId(metadataProperties.get(TABLE_PROPERTIES_BQ_CONNECTION)); + } + } + + private void createTable(String newMetadataLocation, TableMetadata metadata) + { + com.google.api.services.bigquery.model.Table tableBuilder = makeNewTable(metadata, newMetadataLocation); + tableBuilder.setTableReference(this.tableReference); + addConnectionIfProvided(tableBuilder, metadata.properties()); + client.create(tableBuilder); + } + + /** Update table properties with concurrent update detection using etag. */ + private void updateTable( + String oldMetadataLocation, String newMetadataLocation, TableMetadata metadata) + { + Table table = client.load(tableReference); + if (Strings.isNullOrEmpty(table.getEtag())) { + throw new ValidationException( + "Etag of legacy table %s is empty, manually update the table via the BigQuery API or" + + " recreate and retry", + this.tableName); + } + ExternalCatalogTableOptions options = table.getExternalCatalogTableOptions(); + addConnectionIfProvided(table, metadata.properties()); + + String metadataLocationFromMetastore = + options.getParameters().getOrDefault(METADATA_LOCATION_PROP, ""); + if (!metadataLocationFromMetastore.isEmpty() + && !metadataLocationFromMetastore.equals(oldMetadataLocation)) { + throw new CommitFailedException( + "Cannot commit base metadata location '%s' is not same as the current table metadata location '%s' for" + + " %s.%s", + oldMetadataLocation, + metadataLocationFromMetastore, + tableReference.getDatasetId(), + tableReference.getTableId()); + } + + options.setParameters(buildTableParameters(newMetadataLocation, metadata)); + try { + client.update(tableReference, table); + } + catch (ValidationException e) { + if (e.getMessage().toLowerCase(Locale.ENGLISH).contains("etag mismatch")) { + throw new CommitFailedException( + "Updating table failed due to conflict updates (etag mismatch). Retry the update"); + } + throw e; + } + } + + @Override + protected void commitNewTable(TableMetadata metadata) + { + verify(version.isEmpty(), "commitNewTable called on a table which already exists"); + String newMetadataLocation = writeNewMetadata(metadata, 0); + boolean isCommitted = false; + try { + createTable(newMetadataLocation, metadata); + isCommitted = true; + } + catch (Exception e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Cannot commit table creation", e); + } + finally { + try { + if (!isCommitted) { + io().deleteFile(newMetadataLocation); + } + } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_INVALID_METADATA, format("Failed to cleanup metadata file at [%s] property: %s", newMetadataLocation, getSchemaTableName())); + } + } + shouldRefresh = true; + } + + @Override + protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) + { + String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1); + try { + updateTable(base.metadataFileLocation(), newMetadataLocation, metadata); + } + catch (Exception e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, "Cannot update table", e); + } + shouldRefresh = true; + } + + @Override + protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata) + { + throw new UnsupportedOperationException("Committing views through BigQuery Metastore is not yet implemented"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/BigQueryMetastoreIcebergTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/BigQueryMetastoreIcebergTableOperationsProvider.java new file mode 100644 index 000000000000..913c2439780a --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/BigQueryMetastoreIcebergTableOperationsProvider.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.bigquery; + +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory; +import io.trino.spi.connector.ConnectorSession; +import jakarta.inject.Inject; +import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreClientImpl; + +import java.util.Optional; + +import static io.trino.plugin.iceberg.IcebergSessionProperties.isUseFileSizeFromMetadata; +import static java.util.Objects.requireNonNull; + +public class BigQueryMetastoreIcebergTableOperationsProvider + implements IcebergTableOperationsProvider +{ + private final TrinoFileSystemFactory fileSystemFactory; + private final ForwardingFileIoFactory fileIoFactory; + private final BigQueryMetastoreClientImpl bqmsClient; + private final String projectId; + + @Inject + public BigQueryMetastoreIcebergTableOperationsProvider( + TrinoFileSystemFactory fileSystemFactory, + ForwardingFileIoFactory fileIoFactory, + BigQueryMetastoreClientImpl bqmsClient, + IcebergBigQueryMetastoreCatalogConfig config) + { + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.fileIoFactory = requireNonNull(fileIoFactory, "fileIoFactory is null"); + this.bqmsClient = requireNonNull(bqmsClient, "BQMS Client is null"); + this.projectId = requireNonNull(config.getProjectID(), "projectId is null"); + } + + @Override + public IcebergTableOperations createTableOperations( + TrinoCatalog catalog, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + return new BigQueryMetastoreIcebergTableOperations( + bqmsClient, + fileIoFactory.create(fileSystemFactory.create(session), isUseFileSizeFromMetadata(session)), + session, + database, + table, + owner, + location, + projectId); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/BigQueryMetastoreIcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/BigQueryMetastoreIcebergUtil.java new file mode 100644 index 000000000000..3e4df63614f4 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/BigQueryMetastoreIcebergUtil.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.bigquery; + +import com.google.api.client.util.Maps; +import com.google.api.services.bigquery.model.ExternalCatalogDatasetOptions; +import com.google.api.services.bigquery.model.ExternalCatalogTableOptions; +import com.google.api.services.bigquery.model.SerDeInfo; +import com.google.api.services.bigquery.model.StorageDescriptor; + +import java.util.Map; + +public class BigQueryMetastoreIcebergUtil +{ + private BigQueryMetastoreIcebergUtil() {} + + private static final String HIVE_SERIALIZATION_LIBRARY = + "org.apache.iceberg.mr.hive.HiveIcebergSerDe"; + private static final String HIVE_FILE_INPUT_FORMAT = + "org.apache.iceberg.mr.hive.HiveIcebergInputFormat"; + private static final String HIVE_FILE_OUTPUT_FORMAT = + "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat"; + + /** + * Creates a new ExternalCatalogTableOptions object populated with the supported library constants + * and parameters given. + * + * @param locationUri storage location uri + * @param parameters table metadata parameters + */ + public static ExternalCatalogTableOptions createExternalCatalogTableOptions( + String locationUri, Map parameters) + { + SerDeInfo serDeInfo = new SerDeInfo().setSerializationLibrary(HIVE_SERIALIZATION_LIBRARY); + + StorageDescriptor storageDescriptor = + new StorageDescriptor() + .setLocationUri(locationUri) + .setInputFormat(HIVE_FILE_INPUT_FORMAT) + .setOutputFormat(HIVE_FILE_OUTPUT_FORMAT) + .setSerdeInfo(serDeInfo); + + return new ExternalCatalogTableOptions() + .setStorageDescriptor(storageDescriptor) + .setParameters(parameters); + } + + /** + * Creates a new ExternalCatalogDatasetOptions object populated with the supported library + * constants and parameters given. + * + * @param defaultStorageLocationUri dataset's default location uri + * @param metadataParameters metadata parameters for the dataset + */ + public static ExternalCatalogDatasetOptions createExternalCatalogDatasetOptions( + String defaultStorageLocationUri, Map metadataParameters) + { + Map elements = Maps.newHashMap(); + metadataParameters.forEach((key, value) -> elements.put(key, value.toString())); + return new ExternalCatalogDatasetOptions() + .setDefaultStorageLocationUri(defaultStorageLocationUri) + .setParameters(elements); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/IcebergBigQueryMetastoreCatalogConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/IcebergBigQueryMetastoreCatalogConfig.java new file mode 100644 index 000000000000..c9f28e786609 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/IcebergBigQueryMetastoreCatalogConfig.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.bigquery; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; + +public class IcebergBigQueryMetastoreCatalogConfig +{ + private String projectID; + private String location; + private String listAllTables; + private String warehouse; + private String jsonKeyFilePath; + + @Config("iceberg.bqms-catalog.project-id") + @ConfigDescription("The project id to use for BQMS") + public IcebergBigQueryMetastoreCatalogConfig setProjectID(String projectID) + { + this.projectID = projectID; + return this; + } + + @Config("iceberg.bqms-catalog.location") + @ConfigDescription("The location to use for BQMS") + public IcebergBigQueryMetastoreCatalogConfig setLocation(String location) + { + this.location = location; + return this; + } + + @Config("iceberg.bqms-catalog.list-all-tables") + @ConfigDescription("The list all tables config for BQMS") + public IcebergBigQueryMetastoreCatalogConfig setListAllTables(String listAllTables) + { + this.listAllTables = listAllTables; + return this; + } + + @Config("iceberg.bqms-catalog.warehouse") + @ConfigDescription("The list all tables config for BQMS") + public IcebergBigQueryMetastoreCatalogConfig setWarehouse(String warehouse) + { + this.warehouse = warehouse; + return this; + } + + @Config("iceberg.bqms-catalog.json-key-file-path") + @ConfigDescription("Service account will be used to connect BQMS") + public IcebergBigQueryMetastoreCatalogConfig setJsonKeyFilePath(String jsonKeyFilePath) + { + this.jsonKeyFilePath = jsonKeyFilePath; + return this; + } + + public String getProjectID() + { + return projectID; + } + + public String getLocation() + { + return location; + } + + public String getListAllTables() + { + return listAllTables; + } + + public String getWarehouse() + { + return warehouse; + } + + public String getJsonKeyFilePath() + { + return jsonKeyFilePath; + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/IcebergBigQueryMetastoreModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/IcebergBigQueryMetastoreModule.java new file mode 100644 index 000000000000..8358eee9bfcd --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/IcebergBigQueryMetastoreModule.java @@ -0,0 +1,72 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.bigquery; + +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreClientImpl; + +import javax.security.auth.login.CredentialException; + +import java.io.FileInputStream; +import java.io.IOException; +import java.security.GeneralSecurityException; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static org.weakref.jmx.guice.ExportBinder.newExporter; + +public class IcebergBigQueryMetastoreModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(IcebergBigQueryMetastoreCatalogConfig.class); + binder.bind(IcebergTableOperationsProvider.class).to(BigQueryMetastoreIcebergTableOperationsProvider.class).in(Scopes.SINGLETON); + newExporter(binder).export(IcebergTableOperationsProvider.class).withGeneratedName(); + binder.bind(TrinoCatalogFactory.class).to(TrinoBigQueryMetastoreCatalogFactory.class).in(Scopes.SINGLETON); + newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName(); + } + + @Provides + @Singleton + public static BigQueryMetastoreClientImpl createBigQueryMetastoreClient(IcebergBigQueryMetastoreCatalogConfig config) + throws GeneralSecurityException, IOException + { + BigQueryOptions.Builder optionsBuilder = BigQueryOptions.newBuilder(); + + if (config.getProjectID() != null) { + optionsBuilder.setProjectId(config.getProjectID()); + } + if (config.getLocation() != null) { + optionsBuilder.setLocation(config.getLocation()); + } + if (config.getJsonKeyFilePath() != null) { + try (FileInputStream fs = new FileInputStream(config.getJsonKeyFilePath())) { + optionsBuilder.setCredentials(ServiceAccountCredentials.fromStream(fs)); + } + catch (Exception e) { + throw new CredentialException("Unable to locate GCP Service Account JSON file: " + e.getMessage(), e); + } + } + return new BigQueryMetastoreClientImpl(optionsBuilder.build()); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/TrinoBigQueryMetastoreCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/TrinoBigQueryMetastoreCatalog.java new file mode 100644 index 000000000000..7437c61b1c08 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/TrinoBigQueryMetastoreCatalog.java @@ -0,0 +1,483 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.bigquery; + +import com.google.api.client.util.Maps; +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetList; +import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.ExternalCatalogDatasetOptions; +import com.google.api.services.bigquery.model.TableReference; +import com.google.common.base.Strings; +import com.google.common.cache.Cache; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.UncheckedExecutionException; +import io.trino.cache.EvictableCacheBuilder; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.metastore.TableInfo; +import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory; +import io.trino.spi.TrinoException; +import io.trino.spi.catalog.CatalogName; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.RelationColumnsMetadata; +import io.trino.spi.connector.RelationCommentMetadata; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.type.TypeManager; +import jakarta.annotation.Nullable; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreClientImpl; +import org.apache.iceberg.util.LocationUtil; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.function.UnaryOperator; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Throwables.throwIfUnchecked; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; +import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; +import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; + +public class TrinoBigQueryMetastoreCatalog + extends AbstractTrinoCatalog +{ + private static final int PER_QUERY_CACHE_SIZE = 1000; + + // User provided properties. + public static final String PROJECT_ID = "gcp.bigquery.project-id"; + public static final String GCP_LOCATION = "gcp.bigquery.location"; + public static final String LIST_ALL_TABLES = "gcp.bigquery.list-all-tables"; + public static final String WAREHOUSE = "gcp.bigquery.warehouse"; + + private final BigQueryMetastoreClientImpl client; + private final String projectID; + private final String gcpLocation; + private final String warehouse; + private final boolean listAllTables; + + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); + + public TrinoBigQueryMetastoreCatalog(CatalogName catalogName, + TypeManager typeManager, + TrinoFileSystemFactory fileSystemFactory, + ForwardingFileIoFactory fileIoFactory, + IcebergTableOperationsProvider tableOperationsProvider, + BigQueryMetastoreClientImpl bigQueryMetastoreClient, + String gcpLocation, + String projectID, + String listAllTables, + String warehouse, + boolean isUniqueTableLocation) + { + super(catalogName, isUniqueTableLocation, typeManager, tableOperationsProvider, fileSystemFactory, fileIoFactory); + this.client = bigQueryMetastoreClient; + this.projectID = projectID; + this.gcpLocation = gcpLocation; + this.warehouse = warehouse; + this.listAllTables = Boolean.parseBoolean(listAllTables); + } + + @Override + protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + return Optional.empty(); + } + + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.invalidate(schemaTableName); + } + + private Map toMetadata(Dataset dataset) + { + ExternalCatalogDatasetOptions options = dataset.getExternalCatalogDatasetOptions(); + Map metadata = Maps.newHashMap(); + if (options != null) { + if (options.getParameters() != null) { + metadata.putAll(options.getParameters()); + } + + if (!Strings.isNullOrEmpty(options.getDefaultStorageLocationUri())) { + metadata.put("location", options.getDefaultStorageLocationUri()); + } + } + + return metadata; + } + + @Override + public boolean namespaceExists(ConnectorSession session, String namespace) + { + try { + client.load(toDatasetReference(namespace)); + return true; + } + catch (Throwable e) { + return false; + } + } + + private String toNamespaceName(DatasetList.Datasets dataset) + { + return dataset.getDatasetReference().getDatasetId(); + } + + @Override + public List listNamespaces(ConnectorSession session) + { + List allDatasets = client.list(this.projectID); + return allDatasets.stream().map(this::toNamespaceName).collect(toImmutableList()); + } + + @Override + public void dropNamespace(ConnectorSession session, String namespace) + { + try { + client.delete(toDatasetReference(namespace)); + } + catch (NoSuchNamespaceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + private void validateNamespace(String namespace) + { + checkArgument( + namespace.split("\\.").length == 1, + String.format( + Locale.ROOT, + "BigQuery Metastore only supports single level namespaces. Invalid namespace: \"%s\" has %s" + + " levels", + namespace, + namespace.split("\\.").length)); + } + + private DatasetReference toDatasetReference(String namespace) + { + validateNamespace(namespace); + return new DatasetReference().setProjectId(projectID).setDatasetId( + Arrays.stream(namespace.split("\\.")).toList().getFirst()); + } + + private TableReference toTableReference(TableIdentifier tableIdentifier) + { + DatasetReference datasetReference = toDatasetReference(tableIdentifier.namespace().level(0)); + return new TableReference() + .setProjectId(datasetReference.getProjectId()) + .setDatasetId(datasetReference.getDatasetId()) + .setTableId(tableIdentifier.name()); + } + + @Override + public Map loadNamespaceMetadata(ConnectorSession session, String namespace) + { + try { + return toMetadata(client.load(toDatasetReference(namespace))); + } + catch (Throwable e) { + throw new NoSuchNamespaceException("%s", e.getMessage()); + } + } + + @Override + public Optional getNamespacePrincipal(ConnectorSession session, String namespace) + { + return Optional.empty(); + } + + private String createDefaultStorageLocationUri(String dbId) + { + checkArgument( + warehouse != null, + String.format( + "Invalid data warehouse location: %s not set", WAREHOUSE)); + return String.format("%s/%s.db", LocationUtil.stripTrailingSlash(warehouse), dbId); + } + + @Override + public void createNamespace(ConnectorSession session, String namespace, Map properties, TrinoPrincipal owner) + { + Dataset builder = new Dataset(); + DatasetReference datasetReference = toDatasetReference(namespace); + builder.setLocation(this.gcpLocation); + builder.setDatasetReference(datasetReference); + builder.setExternalCatalogDatasetOptions( + BigQueryMetastoreIcebergUtil.createExternalCatalogDatasetOptions( + createDefaultStorageLocationUri(datasetReference.getDatasetId()), properties)); + + client.create(builder); + } + + @Override + public void setNamespacePrincipal(ConnectorSession session, String namespace, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setNamespacePrincipal is not supported for BigQuery Metastore"); + } + + @Override + public void renameNamespace(ConnectorSession session, String source, String target) + { + throw new TrinoException(NOT_SUPPORTED, "renameNamespace is not supported for BigQuery Metastore"); + } + + private List list(Optional namespace, boolean allTables) + { + return client.list(toDatasetReference(namespace.get()), allTables).stream() + .map( + table -> new TableInfo(SchemaTableName.schemaTableName(namespace.get(), table.getTableReference().getTableId()), + TableInfo.ExtendedRelationType.TABLE)) + .collect(toImmutableList()); + } + + @Override + public List listTables(ConnectorSession session, Optional namespace) + { + validateNamespace(namespace.get()); + return list(namespace, listAllTables); + } + + @Override + public List listIcebergTables(ConnectorSession session, Optional namespace) + { + return list(namespace, false).stream().map(TableInfo::tableName).collect(toImmutableList()); + } + + @Override + public Optional> streamRelationColumns(ConnectorSession session, Optional namespace, UnaryOperator> relationFilter, Predicate isRedirected) + { + return Optional.empty(); + } + + @Override + public Optional> streamRelationComments(ConnectorSession session, Optional namespace, UnaryOperator> relationFilter, Predicate isRedirected) + { + return Optional.empty(); + } + + @Override + public Transaction newCreateTableTransaction(ConnectorSession session, SchemaTableName schemaTableName, Schema schema, PartitionSpec partitionSpec, SortOrder sortOrder, Optional location, Map properties) + { + return newCreateTableTransaction( + session, + schemaTableName, + schema, + partitionSpec, + sortOrder, + location, + properties, + Optional.of(session.getUser())); + } + + @Override + public Transaction newCreateOrReplaceTableTransaction(ConnectorSession session, SchemaTableName schemaTableName, Schema schema, PartitionSpec partitionSpec, SortOrder sortOrder, String location, Map properties) + { + return newCreateOrReplaceTableTransaction( + session, + schemaTableName, + schema, + partitionSpec, + sortOrder, + location, + properties, + Optional.of(session.getUser())); + } + + @Override + public void registerTable(ConnectorSession session, SchemaTableName tableName, TableMetadata tableMetadata) + { + throw new TrinoException(NOT_SUPPORTED, "registerTable is not supported for BigQuery Metastore."); + } + + @Override + public void unregisterTable(ConnectorSession session, SchemaTableName tableName) + { + throw new TrinoException(NOT_SUPPORTED, "unregisterTable is not supported for BigQuery Metastore."); + } + + @Override + public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) + { + TableIdentifier identifier = TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()); + client.delete(toTableReference(identifier)); + invalidateTableCache(schemaTableName); + } + + @Override + public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaTableName) + { + dropTable(session, schemaTableName); + } + + @Override + public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) + { + throw new TrinoException(NOT_SUPPORTED, "Table rename is not supported for BigQuery Metastore"); + } + + @Override + public BaseTable loadTable(ConnectorSession session, SchemaTableName table) + { + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + table, + () -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table)).operations().current(); + }); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } + + return getIcebergTableWithMetadata( + this, + tableOperationsProvider, + session, + table, + metadata); + } + + @Override + public Map> tryGetColumnMetadata(ConnectorSession session, List tables) + { + return ImmutableMap.of(); + } + + @Override + public void updateViewComment(ConnectorSession session, SchemaTableName schemaViewName, Optional comment) + { + throw new TrinoException(NOT_SUPPORTED, "updateViewComment is not supported for BigQuery Metastore"); + } + + @Override + public void updateViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional comment) + { + throw new TrinoException(NOT_SUPPORTED, "updateViewColumnComment is not supported for BigQuery Metastore"); + } + + @Nullable + @Override + public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) + { + return String.format("%s/%s", createDefaultStorageLocationUri(schemaTableName.getSchemaName()), schemaTableName.getTableName()); + } + + @Override + public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setTablePrincipal is not supported for BigQuery Metastore"); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace) + { + throw new TrinoException(NOT_SUPPORTED, "createView is not supported for BigQuery Metastore"); + } + + @Override + public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameView is not supported for BigQuery Metastore"); + } + + @Override + public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setViewPrincipal is not supported for BigQuery Metastore"); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropView is not supported for BigQuery Metastore"); + } + + @Override + public Optional getView(ConnectorSession session, SchemaTableName viewName) + { + return Optional.empty(); + } + + @Override + public void createMaterializedView(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition definition, Map materializedViewProperties, boolean replace, boolean ignoreExisting) + { + throw new TrinoException(NOT_SUPPORTED, "createMaterializedView is not supported for BigQuery Metastore"); + } + + @Override + public void updateMaterializedViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional comment) + { + throw new TrinoException(NOT_SUPPORTED, "updateMaterializedViewColumnComment is not supported for BigQuery Metastore"); + } + + @Override + public void dropMaterializedView(ConnectorSession session, SchemaTableName viewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropMaterializedView is not supported for BigQuery Metastore"); + } + + @Override + public Optional getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName) + { + return Optional.empty(); + } + + @Override + public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported for BigQuery Metastore"); + } + + @Override + public Optional redirectTable(ConnectorSession session, SchemaTableName tableName, String hiveCatalogName) + { + return Optional.empty(); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/TrinoBigQueryMetastoreCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/TrinoBigQueryMetastoreCatalogFactory.java new file mode 100644 index 000000000000..7dacdbf7396e --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/bigquery/TrinoBigQueryMetastoreCatalogFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.bigquery; + +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory; +import io.trino.spi.catalog.CatalogName; +import io.trino.spi.security.ConnectorIdentity; +import io.trino.spi.type.TypeManager; +import jakarta.inject.Inject; +import org.apache.iceberg.gcp.bigquery.BigQueryMetastoreClientImpl; + +import static java.util.Objects.requireNonNull; + +public class TrinoBigQueryMetastoreCatalogFactory + implements TrinoCatalogFactory +{ + private final IcebergTableOperationsProvider tableOperationsProvider; + private final BigQueryMetastoreClientImpl bigQueryMetastoreClient; + private final CatalogName catalogName; + private final TypeManager typeManager; + private final TrinoFileSystemFactory fileSystemFactory; + private final ForwardingFileIoFactory fileIoFactory; + private final String projectID; + private final String gcpLocation; + private final String listAllTables; + private final String warehouse; + private final boolean isUniqueTableLocation; + + @Inject + public TrinoBigQueryMetastoreCatalogFactory( + CatalogName catalogName, + TypeManager typeManager, + TrinoFileSystemFactory fileSystemFactory, + ForwardingFileIoFactory fileIoFactory, + IcebergTableOperationsProvider tableOperationsProvider, + BigQueryMetastoreClientImpl bigQueryMetastoreClient, + IcebergBigQueryMetastoreCatalogConfig icebergBigQueryMetastoreCatalogConfig, + IcebergConfig icebergConfig) + { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.fileIoFactory = requireNonNull(fileIoFactory, "fileIoFactory is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.bigQueryMetastoreClient = requireNonNull(bigQueryMetastoreClient, "bigQueryMetastoreClient is null"); + this.gcpLocation = icebergBigQueryMetastoreCatalogConfig.getLocation(); + this.projectID = icebergBigQueryMetastoreCatalogConfig.getProjectID(); + this.listAllTables = icebergBigQueryMetastoreCatalogConfig.getListAllTables(); + this.warehouse = icebergBigQueryMetastoreCatalogConfig.getWarehouse(); + this.isUniqueTableLocation = icebergConfig.isUniqueTableLocation(); + } + + @Override + public TrinoCatalog create(ConnectorIdentity identity) + { + return new TrinoBigQueryMetastoreCatalog( + catalogName, + typeManager, + fileSystemFactory, + fileIoFactory, + tableOperationsProvider, + bigQueryMetastoreClient, + gcpLocation, + projectID, + listAllTables, + warehouse, + isUniqueTableLocation); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/bigquery/TestIcebergBigQueryMetastoreCatalogConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/bigquery/TestIcebergBigQueryMetastoreCatalogConfig.java new file mode 100644 index 000000000000..667759c7f4b3 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/bigquery/TestIcebergBigQueryMetastoreCatalogConfig.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.bigquery; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestIcebergBigQueryMetastoreCatalogConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(IcebergBigQueryMetastoreCatalogConfig.class) + .setProjectID(null) + .setLocation(null) + .setListAllTables(null) + .setWarehouse(null) + .setJsonKeyFilePath(null)); + } + + @Test + public void testExplicitPropertyMapping() + { + Map properties = ImmutableMap.builder() + .put("iceberg.bqms-catalog.project-id", "test-project-id") + .put("iceberg.bqms-catalog.location", "us-central1") + .put("iceberg.bqms-catalog.list-all-tables", "true") + .put("iceberg.bqms-catalog.warehouse", "gs://test-bucket/warehouse") + .put("iceberg.bqms-catalog.json-key-file-path", "/path/to/service-account.json") + .buildOrThrow(); + + IcebergBigQueryMetastoreCatalogConfig expected = new IcebergBigQueryMetastoreCatalogConfig() + .setProjectID("test-project-id") + .setLocation("us-central1") + .setListAllTables("true") + .setWarehouse("gs://test-bucket/warehouse") + .setJsonKeyFilePath("/path/to/service-account.json"); + + assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/bigquery/TestIcebergBigQueryMetastoreCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/bigquery/TestIcebergBigQueryMetastoreCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..2dd219a1ba54 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/bigquery/TestIcebergBigQueryMetastoreCatalogConnectorSmokeTest.java @@ -0,0 +1,405 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.bigquery; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.plugin.iceberg.SchemaInitializer; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import io.trino.tpch.TpchTable; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.io.FileIO; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.iceberg.IcebergTestUtils.FILE_IO_FACTORY; +import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting; +import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; +import static io.trino.testing.SystemEnvironmentUtils.requireEnv; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; +import static org.apache.iceberg.FileFormat.PARQUET; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; + +/** + * Test for Iceberg connector with BigQuery Metastore catalog. + * Requires environment variables to be set: + * - BIGQUERY_PROJECT_ID: GCP project ID + * - BIGQUERY_LOCATION: GCP location (e.g., us-central1) + * - BIGQUERY_WAREHOUSE: GCS warehouse location (e.g., gs://bucket/warehouse) + * - BIGQUERY_CREDENTIALS_PATH: Path to service account JSON key file + */ +@TestInstance(PER_CLASS) +@Execution(SAME_THREAD) +public class TestIcebergBigQueryMetastoreCatalogConnectorSmokeTest + extends BaseIcebergConnectorSmokeTest +{ + private Path tempDir; + + public TestIcebergBigQueryMetastoreCatalogConnectorSmokeTest() + { + super(new IcebergConfig().getFileFormat().toIceberg()); + } + + @AfterAll + public void teardown() + throws IOException + { + if (tempDir != null) { + deleteRecursively(tempDir, ALLOW_INSECURE); + } + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + String projectId = requireEnv("BIGQUERY_PROJECT_ID"); + String location = requireEnv("BIGQUERY_LOCATION"); + String warehouse = requireEnv("BIGQUERY_WAREHOUSE"); + String credentialsPath = requireEnv("BIGQUERY_CREDENTIALS_PATH"); + + tempDir = Files.createTempDirectory("test_trino_bigquery_metastore_catalog"); + + return IcebergQueryRunner.builder() + .setBaseDataDir(Optional.of(tempDir)) + .setIcebergProperties( + ImmutableMap.of( + "iceberg.file-format", format.name(), + "iceberg.catalog.type", "bigquery_metastore", + "iceberg.bqms-catalog.project-id", projectId, + "iceberg.bqms-catalog.location", location, + "iceberg.bqms-catalog.warehouse", warehouse, + "iceberg.bqms-catalog.json-key-file-path", credentialsPath, + "iceberg.bqms-catalog.list-all-tables", "false", + "iceberg.writer-sort-buffer-size", "1MB", + "iceberg.allowed-extra-properties", "write.metadata.delete-after-commit.enabled,write.metadata.previous-versions-max")) + .setSchemaInitializer( + SchemaInitializer.builder() + .withClonedTpchTables(ImmutableList.>builder() + .addAll(REQUIRED_TPCH_TABLES) + .build()) + .build()) + .build(); + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + return switch (connectorBehavior) { + case SUPPORTS_CREATE_VIEW, + SUPPORTS_CREATE_MATERIALIZED_VIEW, + SUPPORTS_RENAME_SCHEMA, + SUPPORTS_RENAME_TABLE -> false; + default -> super.hasBehavior(connectorBehavior); + }; + } + + @Test + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasStackTraceContaining("createView is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasStackTraceContaining("createMaterializedView is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasStackTraceContaining("renameNamespace is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testRenameTable() + { + assertThatThrownBy(super::testRenameTable) + .hasStackTraceContaining("Table rename is not supported for BigQuery Metastore"); + } + + @Override + protected void dropTableFromMetastore(String tableName) + { + // used when registering a table, which is not supported by the BigQuery Metastore catalog + } + + @Override + protected String getMetadataLocation(String tableName) + { + // This would require access to BigQuery Metastore API to get metadata location + // For now, we'll return a placeholder that needs to be implemented + throw new UnsupportedOperationException("getMetadataLocation not yet implemented for BigQuery Metastore"); + } + + @Test + @Override + public void testRegisterTableWithTableLocation() + { + assertThatThrownBy(super::testRegisterTableWithTableLocation) + .hasMessageContaining("registerTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testRegisterTableWithComments() + { + assertThatThrownBy(super::testRegisterTableWithComments) + .hasMessageContaining("registerTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testRegisterTableWithShowCreateTable() + { + assertThatThrownBy(super::testRegisterTableWithShowCreateTable) + .hasMessageContaining("registerTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testRegisterTableWithReInsert() + { + assertThatThrownBy(super::testRegisterTableWithReInsert) + .hasMessageContaining("registerTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testRegisterTableWithDroppedTable() + { + assertThatThrownBy(super::testRegisterTableWithDroppedTable) + .hasMessageContaining("registerTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testRegisterTableWithDifferentTableName() + { + assertThatThrownBy(super::testRegisterTableWithDifferentTableName) + .hasMessageContaining("registerTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testRegisterTableWithMetadataFile() + { + assertThatThrownBy(super::testRegisterTableWithMetadataFile) + .hasMessageContaining("registerTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testRegisterTableWithTrailingSpaceInLocation() + { + assertThatThrownBy(super::testRegisterTableWithTrailingSpaceInLocation) + .hasMessageContaining("registerTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testUnregisterTable() + { + assertThatThrownBy(super::testUnregisterTable) + .hasStackTraceContaining("unregisterTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testUnregisterBrokenTable() + { + assertThatThrownBy(super::testUnregisterBrokenTable) + .hasStackTraceContaining("unregisterTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testUnregisterTableNotExistingTable() + { + assertThatThrownBy(super::testUnregisterTableNotExistingTable) + .hasStackTraceContaining("unregisterTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testRepeatUnregisterTable() + { + assertThatThrownBy(super::testRepeatUnregisterTable) + .hasStackTraceContaining("unregisterTable is not supported for BigQuery Metastore"); + } + + @Test + @Override + public void testDropTableWithMissingMetadataFile() + throws Exception + { + String tableName = "test_drop_table_with_missing_metadata_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + + Location metadataLocation = Location.of(getMetadataLocation(tableName)); + Location tableLocation = Location.of(getTableLocation(tableName)); + + // Delete current metadata file + fileSystem.deleteFile(metadataLocation); + assertThat(fileSystem.newInputFile(metadataLocation).exists()) + .describedAs("Current metadata file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs("Table location should exist") + .isTrue(); + } + + @Test + @Override + public void testDropTableWithMissingSnapshotFile() + throws Exception + { + String tableName = "test_drop_table_with_missing_snapshot_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + + String metadataLocation = getMetadataLocation(tableName); + TableMetadata tableMetadata = TableMetadataParser.read(FILE_IO_FACTORY.create(fileSystem), metadataLocation); + Location tableLocation = Location.of(tableMetadata.location()); + Location currentSnapshotFile = Location.of(tableMetadata.currentSnapshot().manifestListLocation()); + + // Delete current snapshot file + fileSystem.deleteFile(currentSnapshotFile); + assertThat(fileSystem.newInputFile(currentSnapshotFile).exists()) + .describedAs("Current snapshot file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs("Table location should exist") + .isTrue(); + } + + @Test + @Override + public void testDropTableWithMissingManifestListFile() + throws Exception + { + String tableName = "test_drop_table_with_missing_manifest_list_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + + String metadataLocation = getMetadataLocation(tableName); + FileIO fileIo = FILE_IO_FACTORY.create(fileSystem); + TableMetadata tableMetadata = TableMetadataParser.read(fileIo, metadataLocation); + Location tableLocation = Location.of(tableMetadata.location()); + Location manifestListFile = Location.of(tableMetadata.currentSnapshot().allManifests(fileIo).get(0).path()); + + // Delete Manifest List file + fileSystem.deleteFile(manifestListFile); + assertThat(fileSystem.newInputFile(manifestListFile).exists()) + .describedAs("Manifest list file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs("Table location should exist") + .isTrue(); + } + + @Test + @Override + public void testDropTableWithMissingDataFile() + throws Exception + { + String tableName = "test_drop_table_with_missing_data_file_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 x, 'INDIA' y", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES (2, 'POLAND')", 1); + + Location tableLocation = Location.of(getTableLocation(tableName)); + Location tableDataPath = tableLocation.appendPath("data"); + FileIterator fileIterator = fileSystem.listFiles(tableDataPath); + assertThat(fileIterator.hasNext()).isTrue(); + Location dataFile = fileIterator.next().location(); + + // Delete data file + fileSystem.deleteFile(dataFile); + assertThat(fileSystem.newInputFile(dataFile).exists()) + .describedAs("Data file should not exist") + .isFalse(); + + // try to drop table + assertUpdate("DROP TABLE " + tableName); + assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse(); + assertThat(fileSystem.listFiles(tableLocation).hasNext()) + .describedAs("Table location should exist") + .isTrue(); + } + + @Override + protected boolean isFileSorted(Location path, String sortColumnName) + { + if (format == PARQUET) { + return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); + } + return checkOrcFileSorting(fileSystem, path, sortColumnName); + } + + @Override + protected void deleteDirectory(String location) + { + // used when unregistering a table, which is not supported by the BigQuery Metastore catalog + } + + @Override + protected String schemaPath() + { + return format("%s/%s", tempDir, getSession().getSchema().orElseThrow()); + } + + @Override + protected boolean locationExists(String location) + { + return Files.exists(Path.of(location)); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/bigquery/TestTrinoBigQueryMetastoreCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/bigquery/TestTrinoBigQueryMetastoreCatalog.java new file mode 100644 index 000000000000..236f4a59c445 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/bigquery/TestTrinoBigQueryMetastoreCatalog.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.bigquery; + +import io.trino.plugin.iceberg.catalog.BaseTrinoCatalogTest; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import org.junit.jupiter.api.Disabled; + +/** + * Base catalog test for BigQuery Metastore. + * This test is currently disabled because BigQueryMetastoreClientImpl is final and cannot be mocked easily. + * The functionality is tested through the connector smoke test instead which uses real BigQuery infrastructure. + * + * To enable these tests, we would need to: + * 1. Extract an interface from BigQueryMetastoreClientImpl + * 2. Use that interface in the catalog implementation + * 3. Create a test implementation of the interface + */ +@Disabled("BigQueryMetastoreClientImpl is final and cannot be mocked. Use TestIcebergBigQueryMetastoreCatalogConnectorSmokeTest instead.") +public class TestTrinoBigQueryMetastoreCatalog + extends BaseTrinoCatalogTest +{ + @Override + protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations) + { + throw new UnsupportedOperationException("This test is disabled. Use integration test instead."); + } +} diff --git a/pom.xml b/pom.xml index 9fb7d20ea1d2..493510ff4d8f 100644 --- a/pom.xml +++ b/pom.xml @@ -1970,6 +1970,12 @@ ${dep.iceberg.version} + + org.apache.iceberg + iceberg-bigquery + ${dep.iceberg.version} + + org.apache.iceberg iceberg-core