Skip to content

Fix ANALYZE when Hive partition has non-canonical value #24973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableMap;
import org.joda.time.DateTimeZone;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
Expand Down Expand Up @@ -89,13 +87,6 @@ public static PartitionStatistics createPartitionStatistics(
return createPartitionStatistics(session, columnTypes, computedStatistics, computedStatistics.getColumnStatistics().keySet(), timeZone);
}

public static Map<ColumnStatisticMetadata, Block> getColumnStatistics(Map<List<String>, ComputedStatistics> statistics, List<String> partitionValues)
{
return Optional.ofNullable(statistics.get(partitionValues))
.map(ComputedStatistics::getColumnStatistics)
.orElse(ImmutableMap.of());
}

// TODO: Collect file count, on-disk size and in-memory size during ANALYZE

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.smile.SmileCodec;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.ArrayType;
Expand Down Expand Up @@ -231,7 +233,6 @@
import static com.facebook.presto.hive.HiveSessionProperties.isUsePageFileForHiveUnsupportedType;
import static com.facebook.presto.hive.HiveSessionProperties.shouldCreateEmptyBucketFilesForTemporaryTable;
import static com.facebook.presto.hive.HiveStatisticsUtil.createPartitionStatistics;
import static com.facebook.presto.hive.HiveStatisticsUtil.getColumnStatistics;
import static com.facebook.presto.hive.HiveStorageFormat.AVRO;
import static com.facebook.presto.hive.HiveStorageFormat.DWRF;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
Expand Down Expand Up @@ -279,6 +280,7 @@
import static com.facebook.presto.hive.HiveUtil.encodeViewData;
import static com.facebook.presto.hive.HiveUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.hive.HiveUtil.hiveColumnHandles;
import static com.facebook.presto.hive.HiveUtil.parsePartitionValue;
import static com.facebook.presto.hive.HiveUtil.translateHiveUnsupportedTypeForTemporaryTable;
import static com.facebook.presto.hive.HiveUtil.translateHiveUnsupportedTypesForTemporaryTable;
import static com.facebook.presto.hive.HiveUtil.verifyPartitionTypeSupported;
Expand All @@ -304,17 +306,20 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT;
import static com.facebook.presto.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static com.facebook.presto.hive.metastore.MetastoreUtil.checkIfNullView;
import static com.facebook.presto.hive.metastore.MetastoreUtil.createPartitionValues;
import static com.facebook.presto.hive.metastore.MetastoreUtil.createTableObjectForViewCreation;
import static com.facebook.presto.hive.metastore.MetastoreUtil.createViewProperties;
import static com.facebook.presto.hive.metastore.MetastoreUtil.extractPartitionValues;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getMetastoreHeaders;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getPartitionNames;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getPartitionNamesWithEmptyVersion;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getProtectMode;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isDeltaLakeTable;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isIcebergTable;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isPrestoView;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled;
import static com.facebook.presto.hive.metastore.MetastoreUtil.makePartName;
import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues;
import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyAndPopulateViews;
import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyOnline;
Expand Down Expand Up @@ -1524,15 +1529,20 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
metastore.setTableStatistics(metastoreContext, table, createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.<String>of()), timeZone));
}
else {
List<String> partitionNames;
List<List<String>> partitionValuesList;
if (handle.getAnalyzePartitionValues().isPresent()) {
partitionValuesList = handle.getAnalyzePartitionValues().get();
partitionNames = partitionValuesList.stream()
.map(partitionValues -> makePartName(partitionColumns, partitionValues))
.collect(toImmutableList());
}
else {
partitionValuesList = metastore.getPartitionNames(metastoreContext, handle.getSchemaName(), handle.getTableName())
.orElseThrow(() -> new TableNotFoundException(((HiveTableHandle) tableHandle).getSchemaTableName()))
partitionNames = getPartitionNames(metastore.getPartitionNames(metastoreContext, handle.getSchemaName(), handle.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName)));
partitionValuesList = partitionNames
.stream()
.map(partitionNameWithVersion -> MetastoreUtil.toPartitionValues(partitionNameWithVersion.getPartitionName()))
.map(MetastoreUtil::toPartitionValues)
.collect(toImmutableList());
}

Expand All @@ -1544,8 +1554,18 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
Supplier<PartitionStatistics> emptyPartitionStatistics = Suppliers.memoize(() -> createEmptyPartitionStatistics(columnTypes, columnStatisticTypes));

int usedComputedStatistics = 0;
for (List<String> partitionValues : partitionValuesList) {
ComputedStatistics collectedStatistics = computedStatisticsMap.get(partitionValues);
List<String> partitionedBy = table.getPartitionColumns().stream()
.map(Column::getName)
.collect(toImmutableList());
List<Type> partitionTypes = partitionedBy.stream()
.map(columnTypes::get)
.collect(toImmutableList());
for (int i = 0; i < partitionNames.size(); i++) {
String partitionName = partitionNames.get(i);
List<String> partitionValues = partitionValuesList.get(i);
ComputedStatistics collectedStatistics = computedStatisticsMap.containsKey(partitionValues)
? computedStatisticsMap.get(partitionValues)
: computedStatisticsMap.get(canonicalizePartitionValues(partitionName, partitionValues, partitionTypes));
if (collectedStatistics == null) {
partitionStatistics.put(partitionValues, emptyPartitionStatistics.get());
}
Expand All @@ -1559,6 +1579,38 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
}
}

private static Map<ColumnStatisticMetadata, Block> getColumnStatistics(Map<List<String>, ComputedStatistics> statistics, List<String> partitionValues)
{
return Optional.ofNullable(statistics.get(partitionValues))
.map(ComputedStatistics::getColumnStatistics)
.orElse(ImmutableMap.of());
}

private Map<ColumnStatisticMetadata, Block> getColumnStatistics(
Map<List<String>, ComputedStatistics> statistics,
String partitionName,
List<String> partitionValues,
List<Type> partitionTypes)
{
Optional<Map<ColumnStatisticMetadata, Block>> columnStatistics = Optional.ofNullable(statistics.get(partitionValues))
.map(ComputedStatistics::getColumnStatistics);
return columnStatistics
.orElseGet(() -> getColumnStatistics(statistics, canonicalizePartitionValues(partitionName, partitionValues, partitionTypes)));
}

private List<String> canonicalizePartitionValues(String partitionName, List<String> partitionValues, List<Type> partitionTypes)
{
verify(partitionValues.size() == partitionTypes.size(), "Expected partitionTypes size to be %s but got %s", partitionValues.size(), partitionTypes.size());
Block[] parsedPartitionValuesBlocks = new Block[partitionValues.size()];
for (int i = 0; i < partitionValues.size(); i++) {
String partitionValue = partitionValues.get(i);
Type partitionType = partitionTypes.get(i);
parsedPartitionValuesBlocks[i] = parsePartitionValue(partitionName, partitionValue, partitionType, timeZone).asBlock();
}

return createPartitionValues(partitionTypes, new Page(parsedPartitionValuesBlocks), 0);
}

@Override
public HiveOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorNewTableLayout> layout)
{
Expand Down Expand Up @@ -2089,12 +2141,16 @@ else if (partitionUpdate.getUpdateMode() == APPEND) {
throw new PrestoException(HIVE_UNSUPPORTED_ENCRYPTION_OPERATION, "Inserting into an existing partition with encryption enabled is not supported yet");
}
// insert into existing partition
List<String> partitionValues = toPartitionValues(partitionUpdate.getName());
String partitionName = partitionUpdate.getName();
List<String> partitionValues = toPartitionValues(partitionName);
List<Type> partitionTypes = partitionedBy.stream()
.map(columnTypes::get)
.collect(toImmutableList());
PartitionStatistics partitionStatistics = createPartitionStatistics(
session,
partitionUpdate.getStatistics(),
columnTypes,
getColumnStatistics(partitionComputedStatistics, partitionValues), timeZone);
getColumnStatistics(partitionComputedStatistics, partitionName, partitionValues, partitionTypes), timeZone);
metastore.finishInsertIntoExistingPartition(
session,
handle.getSchemaName(),
Expand Down Expand Up @@ -2150,11 +2206,16 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Partition format changed during insert");
}

String partitionName = partitionUpdate.getName();
List<String> partitionValues = partition.getValues();
List<Type> partitionTypes = partitionedBy.stream()
.map(columnTypes::get)
.collect(toImmutableList());
PartitionStatistics partitionStatistics = createPartitionStatistics(
session,
partitionUpdate.getStatistics(),
columnTypes,
getColumnStatistics(partitionComputedStatistics, partition.getValues()),
getColumnStatistics(partitionComputedStatistics, partitionName, partitionValues, partitionTypes),
timeZone);

// New partition or overwriting existing partition by staging and moving the new partition
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;

import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;

@Test
public class TestHiveAnalyze
extends AbstractTestQueryFramework
{
private static final String CATALOG = "hive";
private static final String SCHEMA = "test_analyze_schema";

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
Session session = testSessionBuilder().setCatalog(CATALOG).setSchema(SCHEMA).setTimeZoneKey(TimeZoneKey.UTC_KEY).build();
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session).setExtraProperties(ImmutableMap.<String, String>builder().build()).build();

queryRunner.installPlugin(new HivePlugin(CATALOG));
Path catalogDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("hive_data").getParent().resolve("catalog");
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
.put("hive.allow-drop-table", "true")
.put("hive.non-managed-table-writes-enabled", "true")
.put("hive.parquet.use-column-names", "true")
.build();

queryRunner.createCatalog(CATALOG, CATALOG, properties);
queryRunner.execute(format("CREATE SCHEMA %s.%s", CATALOG, SCHEMA));

return queryRunner;
}

@Test
public void testAnalyzePartitionedTableWithNonCanonicalValues()
throws IOException
{
String tableName = "test_analyze_table_canonicalization";
assertUpdate(format("CREATE TABLE %s (a_varchar varchar, month varchar) WITH (partitioned_by = ARRAY['month'], external_location='%s')", tableName, com.google.common.io.Files.createTempDir().getPath()));

assertUpdate(format("INSERT INTO %s VALUES ('A', '01'), ('B', '01'), ('C', '02'), ('D', '03')", tableName), 4);

String tableLocation = (String) computeActual(format("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM %s", tableName)).getOnlyValue();

String externalTableName = "external_" + tableName;
assertUpdate(format(
"CREATE TABLE %s (a_varchar varchar, month integer) WITH (partitioned_by = ARRAY['month'], external_location='%s')", externalTableName, tableLocation));

assertUpdate(format("CALL system.sync_partition_metadata('%s', '%s', 'ADD')", SCHEMA, externalTableName));
assertQuery(format("SELECT * FROM \"%s$partitions\"", externalTableName), "SELECT * FROM VALUES 1, 2, 3");
assertUpdate(format("ANALYZE %s", externalTableName), 4);
assertQuery(format("SHOW STATS FOR %s", externalTableName),
"SELECT * FROM VALUES " +
"('a_varchar', 4.0, 2.0, 0.0, null, null, null, null), " +
"('month', null, 3.0, 0.0, null, 1, 3, null), " +
"(null, null, null, null, 4.0, null, null, null)");

assertUpdate(format("INSERT INTO %s VALUES ('E', '04')", tableName), 1);
assertUpdate(format("CALL system.sync_partition_metadata('%s', '%s', 'ADD')", SCHEMA, externalTableName));
assertQuery(format("SELECT * FROM \"%s$partitions\"", externalTableName), "SELECT * FROM VALUES 1, 2, 3, 4");
assertUpdate(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['04']])", externalTableName), 1);
assertQuery(format("SHOW STATS FOR %s", externalTableName),
"SELECT * FROM VALUES " +
"('a_varchar', 5.0, 2.0, 0.0, null, null, null, null), " +
"('month', null, 4.0, 0.0, null, 1, 4, null), " +
"(null, null, null, null, 5.0, null, null, null)");
// TODO fix selective ANALYZE for table with non-canonical partition values
assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['4']])", externalTableName),
format("Partition no longer exists: %s.%s/month=4", SCHEMA, externalTableName));

assertUpdate(format("DROP TABLE %s", tableName));
}
}
Loading