diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java index b138a899c94cf..574e369464ec9 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/HiveStatisticsUtil.java @@ -21,10 +21,8 @@ import com.facebook.presto.spi.statistics.ColumnStatisticMetadata; import com.facebook.presto.spi.statistics.ComputedStatistics; import com.google.common.base.VerifyException; -import com.google.common.collect.ImmutableMap; import org.joda.time.DateTimeZone; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; @@ -89,13 +87,6 @@ public static PartitionStatistics createPartitionStatistics( return createPartitionStatistics(session, columnTypes, computedStatistics, computedStatistics.getColumnStatistics().keySet(), timeZone); } - public static Map getColumnStatistics(Map, ComputedStatistics> statistics, List partitionValues) - { - return Optional.ofNullable(statistics.get(partitionValues)) - .map(ComputedStatistics::getColumnStatistics) - .orElse(ImmutableMap.of()); - } - // TODO: Collect file count, on-disk size and in-memory size during ANALYZE /** diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index 2c96999e3242d..6287588e4c0b0 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -15,7 +15,9 @@ import com.facebook.airlift.json.JsonCodec; import com.facebook.airlift.json.smile.SmileCodec; +import com.facebook.presto.common.Page; import com.facebook.presto.common.Subfield; +import com.facebook.presto.common.block.Block; import com.facebook.presto.common.predicate.NullableValue; import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.common.type.ArrayType; @@ -231,7 +233,6 @@ import static com.facebook.presto.hive.HiveSessionProperties.isUsePageFileForHiveUnsupportedType; import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable; import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics; -import static com.facebook.presto.hive.HiveStatisticsUtil.getColumnStatistics; import static com.facebook.presto.hive.HiveStorageFormat.AVRO; import static com.facebook.presto.hive.HiveStorageFormat.DWRF; import static com.facebook.presto.hive.HiveStorageFormat.ORC; @@ -279,6 +280,7 @@ import static com.facebook.presto.hive.HiveUtil.encodeViewData; import static com.facebook.presto.hive.HiveUtil.getPartitionKeyColumnHandles; import static com.facebook.presto.hive.HiveUtil.hiveColumnHandles; +import static com.facebook.presto.hive.HiveUtil.parsePartitionValue; import static com.facebook.presto.hive.HiveUtil.translateHiveUnsupportedTypeForTemporaryTable; import static com.facebook.presto.hive.HiveUtil.translateHiveUnsupportedTypesForTemporaryTable; import static com.facebook.presto.hive.HiveUtil.verifyPartitionTypeSupported; @@ -304,17 +306,20 @@ import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT; import static com.facebook.presto.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; import static com.facebook.presto.hive.metastore.MetastoreUtil.checkIfNullView; +import static com.facebook.presto.hive.metastore.MetastoreUtil.createPartitionValues; import static com.facebook.presto.hive.metastore.MetastoreUtil.createTableObjectForViewCreation; import static com.facebook.presto.hive.metastore.MetastoreUtil.createViewProperties; import static com.facebook.presto.hive.metastore.MetastoreUtil.extractPartitionValues; import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema; import static com.facebook.presto.hive.metastore.MetastoreUtil.getMetastoreHeaders; +import static com.facebook.presto.hive.metastore.MetastoreUtil.getPartitionNames; import static com.facebook.presto.hive.metastore.MetastoreUtil.getPartitionNamesWithEmptyVersion; import static com.facebook.presto.hive.metastore.MetastoreUtil.getProtectMode; import static com.facebook.presto.hive.metastore.MetastoreUtil.isDeltaLakeTable; import static com.facebook.presto.hive.metastore.MetastoreUtil.isIcebergTable; import static com.facebook.presto.hive.metastore.MetastoreUtil.isPrestoView; import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled; +import static com.facebook.presto.hive.metastore.MetastoreUtil.makePartName; import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues; import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyAndPopulateViews; import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyOnline; @@ -1524,15 +1529,20 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH metastore.setTableStatistics(metastoreContext, table, createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.of()), timeZone)); } else { + List partitionNames; List> partitionValuesList; if (handle.getAnalyzePartitionValues().isPresent()) { partitionValuesList = handle.getAnalyzePartitionValues().get(); + partitionNames = partitionValuesList.stream() + .map(partitionValues -> makePartName(partitionColumns, partitionValues)) + .collect(toImmutableList()); } else { - partitionValuesList = metastore.getPartitionNames(metastoreContext, handle.getSchemaName(), handle.getTableName()) - .orElseThrow(() -> new TableNotFoundException(((HiveTableHandle) tableHandle).getSchemaTableName())) + partitionNames = getPartitionNames(metastore.getPartitionNames(metastoreContext, handle.getSchemaName(), handle.getTableName()) + .orElseThrow(() -> new TableNotFoundException(tableName))); + partitionValuesList = partitionNames .stream() - .map(partitionNameWithVersion -> MetastoreUtil.toPartitionValues(partitionNameWithVersion.getPartitionName())) + .map(MetastoreUtil::toPartitionValues) .collect(toImmutableList()); } @@ -1544,8 +1554,18 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH Supplier emptyPartitionStatistics = Suppliers.memoize(() -> createEmptyPartitionStatistics(columnTypes, columnStatisticTypes)); int usedComputedStatistics = 0; - for (List partitionValues : partitionValuesList) { - ComputedStatistics collectedStatistics = computedStatisticsMap.get(partitionValues); + List partitionedBy = table.getPartitionColumns().stream() + .map(Column::getName) + .collect(toImmutableList()); + List partitionTypes = partitionedBy.stream() + .map(columnTypes::get) + .collect(toImmutableList()); + for (int i = 0; i < partitionNames.size(); i++) { + String partitionName = partitionNames.get(i); + List partitionValues = partitionValuesList.get(i); + ComputedStatistics collectedStatistics = computedStatisticsMap.containsKey(partitionValues) + ? computedStatisticsMap.get(partitionValues) + : computedStatisticsMap.get(canonicalizePartitionValues(partitionName, partitionValues, partitionTypes)); if (collectedStatistics == null) { partitionStatistics.put(partitionValues, emptyPartitionStatistics.get()); } @@ -1559,6 +1579,38 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH } } + private static Map getColumnStatistics(Map, ComputedStatistics> statistics, List partitionValues) + { + return Optional.ofNullable(statistics.get(partitionValues)) + .map(ComputedStatistics::getColumnStatistics) + .orElse(ImmutableMap.of()); + } + + private Map getColumnStatistics( + Map, ComputedStatistics> statistics, + String partitionName, + List partitionValues, + List partitionTypes) + { + Optional> columnStatistics = Optional.ofNullable(statistics.get(partitionValues)) + .map(ComputedStatistics::getColumnStatistics); + return columnStatistics + .orElseGet(() -> getColumnStatistics(statistics, canonicalizePartitionValues(partitionName, partitionValues, partitionTypes))); + } + + private List canonicalizePartitionValues(String partitionName, List partitionValues, List partitionTypes) + { + verify(partitionValues.size() == partitionTypes.size(), "Expected partitionTypes size to be %s but got %s", partitionValues.size(), partitionTypes.size()); + Block[] parsedPartitionValuesBlocks = new Block[partitionValues.size()]; + for (int i = 0; i < partitionValues.size(); i++) { + String partitionValue = partitionValues.get(i); + Type partitionType = partitionTypes.get(i); + parsedPartitionValuesBlocks[i] = parsePartitionValue(partitionName, partitionValue, partitionType, timeZone).asBlock(); + } + + return createPartitionValues(partitionTypes, new Page(parsedPartitionValuesBlocks), 0); + } + @Override public HiveOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional layout) { @@ -2089,12 +2141,16 @@ else if (partitionUpdate.getUpdateMode() == APPEND) { throw new PrestoException(HIVE_UNSUPPORTED_ENCRYPTION_OPERATION, "Inserting into an existing partition with encryption enabled is not supported yet"); } // insert into existing partition - List partitionValues = toPartitionValues(partitionUpdate.getName()); + String partitionName = partitionUpdate.getName(); + List partitionValues = toPartitionValues(partitionName); + List partitionTypes = partitionedBy.stream() + .map(columnTypes::get) + .collect(toImmutableList()); PartitionStatistics partitionStatistics = createPartitionStatistics( session, partitionUpdate.getStatistics(), columnTypes, - getColumnStatistics(partitionComputedStatistics, partitionValues), timeZone); + getColumnStatistics(partitionComputedStatistics, partitionName, partitionValues, partitionTypes), timeZone); metastore.finishInsertIntoExistingPartition( session, handle.getSchemaName(), @@ -2150,11 +2206,16 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Partition format changed during insert"); } + String partitionName = partitionUpdate.getName(); + List partitionValues = partition.getValues(); + List partitionTypes = partitionedBy.stream() + .map(columnTypes::get) + .collect(toImmutableList()); PartitionStatistics partitionStatistics = createPartitionStatistics( session, partitionUpdate.getStatistics(), columnTypes, - getColumnStatistics(partitionComputedStatistics, partition.getValues()), + getColumnStatistics(partitionComputedStatistics, partitionName, partitionValues, partitionTypes), timeZone); // New partition or overwriting existing partition by staging and moving the new partition diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveAnalyze.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveAnalyze.java new file mode 100644 index 0000000000000..0a06dc8e0b00e --- /dev/null +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveAnalyze.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.hive; + +import com.facebook.presto.Session; +import com.facebook.presto.common.type.TimeZoneKey; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Map; + +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static java.lang.String.format; + +@Test +public class TestHiveAnalyze + extends AbstractTestQueryFramework +{ + private static final String CATALOG = "hive"; + private static final String SCHEMA = "test_analyze_schema"; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + Session session = testSessionBuilder().setCatalog(CATALOG).setSchema(SCHEMA).setTimeZoneKey(TimeZoneKey.UTC_KEY).build(); + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).setExtraProperties(ImmutableMap.builder().build()).build(); + + queryRunner.installPlugin(new HivePlugin(CATALOG)); + Path catalogDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("hive_data").getParent().resolve("catalog"); + Map properties = ImmutableMap.builder() + .put("hive.metastore", "file") + .put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString()) + .put("hive.allow-drop-table", "true") + .put("hive.non-managed-table-writes-enabled", "true") + .put("hive.parquet.use-column-names", "true") + .build(); + + queryRunner.createCatalog(CATALOG, CATALOG, properties); + queryRunner.execute(format("CREATE SCHEMA %s.%s", CATALOG, SCHEMA)); + + return queryRunner; + } + + @Test + public void testAnalyzePartitionedTableWithNonCanonicalValues() + throws IOException + { + String tableName = "test_analyze_table_canonicalization"; + assertUpdate(format("CREATE TABLE %s (a_varchar varchar, month varchar) WITH (partitioned_by = ARRAY['month'], external_location='%s')", tableName, com.google.common.io.Files.createTempDir().getPath())); + + assertUpdate(format("INSERT INTO %s VALUES ('A', '01'), ('B', '01'), ('C', '02'), ('D', '03')", tableName), 4); + + String tableLocation = (String) computeActual(format("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM %s", tableName)).getOnlyValue(); + + String externalTableName = "external_" + tableName; + assertUpdate(format( + "CREATE TABLE %s (a_varchar varchar, month integer) WITH (partitioned_by = ARRAY['month'], external_location='%s')", externalTableName, tableLocation)); + + assertUpdate(format("CALL system.sync_partition_metadata('%s', '%s', 'ADD')", SCHEMA, externalTableName)); + assertQuery(format("SELECT * FROM \"%s$partitions\"", externalTableName), "SELECT * FROM VALUES 1, 2, 3"); + assertUpdate(format("ANALYZE %s", externalTableName), 4); + assertQuery(format("SHOW STATS FOR %s", externalTableName), + "SELECT * FROM VALUES " + + "('a_varchar', 4.0, 2.0, 0.0, null, null, null, null), " + + "('month', null, 3.0, 0.0, null, 1, 3, null), " + + "(null, null, null, null, 4.0, null, null, null)"); + + assertUpdate(format("INSERT INTO %s VALUES ('E', '04')", tableName), 1); + assertUpdate(format("CALL system.sync_partition_metadata('%s', '%s', 'ADD')", SCHEMA, externalTableName)); + assertQuery(format("SELECT * FROM \"%s$partitions\"", externalTableName), "SELECT * FROM VALUES 1, 2, 3, 4"); + assertUpdate(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['04']])", externalTableName), 1); + assertQuery(format("SHOW STATS FOR %s", externalTableName), + "SELECT * FROM VALUES " + + "('a_varchar', 5.0, 2.0, 0.0, null, null, null, null), " + + "('month', null, 4.0, 0.0, null, 1, 4, null), " + + "(null, null, null, null, 5.0, null, null, null)"); + // TODO fix selective ANALYZE for table with non-canonical partition values + assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['4']])", externalTableName), + format("Partition no longer exists: %s.%s/month=4", SCHEMA, externalTableName)); + + assertUpdate(format("DROP TABLE %s", tableName)); + } +}