Skip to content

Commit

Permalink
[fix](statistics)Fix drop stats log editlog bug. Catch drop stats exc…
Browse files Browse the repository at this point in the history
…eption while truncate table. (#40738)

Bug fix.
1. Fix drop stats log editlog NPE bug.
2. Catch drop stats exception while truncate table.
  • Loading branch information
Jibing-Li committed Sep 19, 2024
1 parent c4c94c8 commit 817766f
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2693,11 +2693,7 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
LOG.debug("logModifyTableAddOrDropInvertedIndices info:{}", info);
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
// Drop table column stats after light schema change finished.
try {
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
} catch (Exception e) {
LOG.info("Failed to drop stats after light schema change. Reason: {}", e.getMessage());
}
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);

if (isDropIndex) {
// send drop rpc to be
Expand All @@ -2723,11 +2719,7 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
LOG.debug("logModifyTableAddOrDropColumns info:{}", info);
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropColumns(info);
// Drop table column stats after light schema change finished.
try {
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
} catch (Exception e) {
LOG.info("Failed to drop stats after light schema change. Reason: {}", e.getMessage());
}
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
}
LOG.info("finished modify table's add or drop or modify columns. table: {}, job: {}, is replay: {}",
olapTable.getName(), jobId, isReplay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,7 @@ protected void runRunningJob() throws AlterCancelException {
changeTableState(dbId, tableId, OlapTableState.NORMAL);
LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId);
// Drop table column stats after schema change finished.
try {
Env.getCurrentEnv().getAnalysisManager().dropStats(tbl);
} catch (Exception e) {
LOG.info("Failed to drop stats after schema change finished. Reason: {}", e.getMessage());
}
Env.getCurrentEnv().getAnalysisManager().dropStats(tbl);
}

private void onFinished(OlapTable tbl) {
Expand Down
6 changes: 1 addition & 5 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -4528,11 +4528,7 @@ private void renameColumn(Database db, OlapTable table, String colName,
indexIdToSchemaVersion);
editLog.logColumnRename(info);
LOG.info("rename coloumn[{}] to {}", colName, newColName);
try {
Env.getCurrentEnv().getAnalysisManager().dropStats(table);
} catch (Exception e) {
LOG.info("Failed to drop stats after rename column. Reason: {}", e.getMessage());
}
Env.getCurrentEnv().getAnalysisManager().dropStats(table);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3118,9 +3118,6 @@ public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request
InvalidateStatsTarget target = GsonUtils.GSON.fromJson(request.key, InvalidateStatsTarget.class);
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
TableStatsMeta tableStats = analysisManager.findTableStatsStatus(target.tableId);
if (tableStats == null) {
return new TStatus(TStatusCode.OK);
}
analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId, target.columns, tableStats);
return new TStatus(TStatusCode.OK);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,33 +690,45 @@ public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
return;
}

TableStatsMeta tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
if (tableStats == null) {
return;
}
Set<String> cols = dropStatsStmt.getColumnNames();
long catalogId = dropStatsStmt.getCatalogIdId();
long dbId = dropStatsStmt.getDbId();
long tblId = dropStatsStmt.getTblId();
TableStatsMeta tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
if (tableStats == null) {
return;
// Remove tableMetaStats if drop whole table stats.
if (dropStatsStmt.isAllColumns()) {
removeTableStats(tblId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tblId));
}
invalidateLocalStats(catalogId, dbId, tblId, dropStatsStmt.isAllColumns() ? null : cols, tableStats);
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tblId, cols, dropStatsStmt.isAllColumns());
StatisticsRepository.dropStatistics(tblId, cols);
}

public void dropStats(TableIf table) throws DdlException {
TableStatsMeta tableStats = findTableStatsStatus(table.getId());
if (tableStats == null) {
return;
public void dropStats(TableIf table) {
try {
TableStatsMeta tableStats = findTableStatsStatus(table.getId());
if (tableStats == null) {
return;
}
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
removeTableStats(tableId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId));
Set<String> cols = table.getSchemaAllIndexes(false).stream().map(Column::getName)
.collect(Collectors.toSet());
invalidateLocalStats(catalogId, dbId, tableId, null, tableStats);
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
StatisticsRepository.dropStatistics(table.getId(), cols);
} catch (Throwable e) {
LOG.warn("Failed to drop stats for table {}", table.getName(), e);
}
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
Set<String> cols = table.getSchemaAllIndexes(false).stream().map(Column::getName).collect(Collectors.toSet());
invalidateLocalStats(catalogId, dbId, tableId, null, tableStats);
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
StatisticsRepository.dropStatistics(table.getId(), cols);
}

public void dropCachedStats(long catalogId, long dbId, long tableId) {
Expand All @@ -739,14 +751,9 @@ public void dropCachedStats(long catalogId, long dbId, long tableId) {

public void invalidateLocalStats(long catalogId, long dbId, long tableId,
Set<String> columns, TableStatsMeta tableStats) {
if (tableStats == null) {
return;
}
TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId);
StatisticsCache statsCache = Env.getCurrentEnv().getStatisticsCache();
boolean allColumn = false;
if (columns == null) {
allColumn = true;
columns = table.getSchemaAllIndexes(false)
.stream().map(Column::getName).collect(Collectors.toSet());
}
Expand All @@ -759,18 +766,16 @@ public void invalidateLocalStats(long catalogId, long dbId, long tableId,
indexIds.add(-1L);
}
for (long indexId : indexIds) {
tableStats.removeColumn(column);
if (tableStats != null) {
tableStats.removeColumn(column);
}
statsCache.invalidate(tableId, indexId, column);
}
}
// To remove stale column name that is changed before.
if (allColumn) {
tableStats.removeAllColumn();
tableStats.clearIndexesRowCount();
removeTableStats(tableId);
if (tableStats != null) {
tableStats.updatedTime = 0;
tableStats.userInjected = false;
}
tableStats.updatedTime = 0;
tableStats.userInjected = false;
}

public void invalidateRemoteStats(long catalogId, long dbId, long tableId,
Expand All @@ -780,18 +785,15 @@ public void invalidateRemoteStats(long catalogId, long dbId, long tableId,
request.key = GsonUtils.GSON.toJson(target);
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
boolean success = true;
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
// Skip master
if (selfNode.getHost().equals(frontend.getHost())) {
continue;
}
success = success && statisticsCache.invalidateStats(frontend, request);
statisticsCache.invalidateStats(frontend, request);
}
if (!success) {
// If any rpc failed, use edit log to sync table stats to non-master FEs.
LOG.warn("Failed to invalidate all remote stats by rpc for table {}, use edit log.", tableId);
TableStatsMeta tableStats = findTableStatsStatus(tableId);
TableStatsMeta tableStats = findTableStatsStatus(tableId);
if (tableStats != null) {
logCreateTableStats(tableStats);
}
}
Expand Down
179 changes: 179 additions & 0 deletions regression-test/suites/statistics/test_drop_stats_and_truncate.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_drop_stats_and_truncate") {

sql """drop database if exists test_drop_stats_and_truncate"""
sql """create database test_drop_stats_and_truncate"""
sql """use test_drop_stats_and_truncate"""
sql """set global enable_auto_analyze=false"""

sql """CREATE TABLE non_part (
r_regionkey int NOT NULL,
r_name VARCHAR(25) NOT NULL,
r_comment VARCHAR(152)
)ENGINE=OLAP
DUPLICATE KEY(`r_regionkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`r_regionkey`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
"""
sql """CREATE TABLE `part` (
`id` INT NULL,
`colint` INT NULL,
`coltinyint` tinyint NULL,
`colsmallint` smallINT NULL,
`colbigint` bigINT NULL,
`collargeint` largeINT NULL,
`colfloat` float NULL,
`coldouble` double NULL,
`coldecimal` decimal(27, 9) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
PARTITION BY RANGE(`id`)
(
PARTITION p1 VALUES [("-2147483648"), ("10000")),
PARTITION p2 VALUES [("10000"), ("20000")),
PARTITION p3 VALUES [("20000"), ("30000"))
)
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
)
"""
sql """insert into non_part values (1, "1", "1");"""
sql """analyze table non_part with sync"""

def result = sql """show column cached stats non_part"""
assertEquals(3, result.size())
result = sql """show column stats non_part"""
assertEquals(3, result.size())
result = sql """show table stats non_part"""
def all_columns = result[0][4]
String[] columns = all_columns.split(",");
assertEquals(3, columns.size())

sql """drop stats non_part(r_comment)"""
result = sql """show column cached stats non_part"""
assertEquals(2, result.size())
result = sql """show column stats non_part"""
assertEquals(2, result.size())
result = sql """show table stats non_part"""
all_columns = result[0][4]
columns = all_columns.split(",");
assertEquals(2, columns.size())

sql """drop stats non_part"""
result = sql """show column cached stats non_part"""
assertEquals(0, result.size())
result = sql """show column stats non_part"""
assertEquals(0, result.size())
result = sql """show table stats non_part"""
all_columns = result[0][4]
assertEquals("", all_columns)

sql """analyze table non_part with sync"""
result = sql """show column cached stats non_part"""
assertEquals(3, result.size())
result = sql """show column stats non_part"""
assertEquals(3, result.size())
result = sql """show table stats non_part"""
all_columns = result[0][4]
columns = all_columns.split(",");
assertEquals(3, columns.size())

sql """truncate table non_part"""
result = sql """show column stats non_part"""
assertEquals(0, result.size())
result = sql """show table stats non_part"""
all_columns = result[0][4]
assertEquals("", all_columns)

sql """Insert into part values (1, 1, 1, 1, 1, 1, 1.1, 1.1, 1.1), (2, 2, 2, 2, 2, 2, 2.2, 2.2, 2.2), (3, 3, 3, 3, 3, 3, 3.3, 3.3, 3.3),(4, 4, 4, 4, 4, 4, 4.4, 4.4, 4.4),(5, 5, 5, 5, 5, 5, 5.5, 5.5, 5.5),(6, 6, 6, 6, 6, 6, 6.6, 6.6, 6.6),(10001, 10001, 10001, 10001, 10001, 10001, 10001.10001, 10001.10001, 10001.10001),(10002, 10002, 10002, 10002, 10002, 10002, 10002.10002, 10002.10002, 10002.10002),(10003, 10003, 10003, 10003, 10003, 10003, 10003.10003, 10003.10003, 10003.10003),(10004, 10004, 10004, 10004, 10004, 10004, 10004.10004, 10004.10004, 10004.10004),(10005, 10005, 10005, 10005, 10005, 10005, 10005.10005, 10005.10005, 10005.10005),(10006, 10006, 10006, 10006, 10006, 10006, 10006.10006, 10006.10006, 10006.10006),(20001, 20001, 20001, 20001, 20001, 20001, 20001.20001, 20001.20001, 20001.20001),(20002, 20002, 20002, 20002, 20002, 20002, 20002.20002, 20002.20002, 20002.20002),(20003, 20003, 20003, 20003, 20003, 20003, 20003.20003, 20003.20003, 20003.20003),(20004, 20004, 20004, 20004, 20004, 20004, 20004.20004, 20004.20004, 20004.20004),(20005, 20005, 20005, 20005, 20005, 20005, 20005.20005, 20005.20005, 20005.20005),(20006, 20006, 20006, 20006, 20006, 20006, 20006.20006, 20006.20006, 20006.20006)"""
sql """analyze table part with sync"""
result = sql """show column cached stats part"""
assertEquals(9, result.size())
result = sql """show column stats part"""
assertEquals(9, result.size())
result = sql """show table stats part"""
all_columns = result[0][4]
columns = all_columns.split(",");
assertEquals(9, columns.size())

sql """drop stats part(colint)"""
result = sql """show column cached stats part"""
assertEquals(8, result.size())
result = sql """show column stats part"""
assertEquals(8, result.size())
result = sql """show table stats part"""
all_columns = result[0][4]
columns = all_columns.split(",");
assertEquals(8, columns.size())

sql """drop stats part"""
result = sql """show column cached stats part"""
assertEquals(0, result.size())
result = sql """show column stats part"""
assertEquals(0, result.size())
result = sql """show table stats part"""
all_columns = result[0][4]
assertEquals("", all_columns)

sql """analyze table part with sync"""
result = sql """show column cached stats part"""
assertEquals(9, result.size())
result = sql """show column stats part"""
assertEquals(9, result.size())
result = sql """show table stats part"""
all_columns = result[0][4]
columns = all_columns.split(",");
assertEquals(9, columns.size())

sql """truncate table part"""
result = sql """show column stats part"""
assertEquals(0, result.size())
result = sql """show table stats part"""
all_columns = result[0][4]
assertEquals("", all_columns)

sql """Insert into part values (1, 1, 1, 1, 1, 1, 1.1, 1.1, 1.1), (2, 2, 2, 2, 2, 2, 2.2, 2.2, 2.2), (3, 3, 3, 3, 3, 3, 3.3, 3.3, 3.3),(4, 4, 4, 4, 4, 4, 4.4, 4.4, 4.4),(5, 5, 5, 5, 5, 5, 5.5, 5.5, 5.5),(6, 6, 6, 6, 6, 6, 6.6, 6.6, 6.6),(10001, 10001, 10001, 10001, 10001, 10001, 10001.10001, 10001.10001, 10001.10001),(10002, 10002, 10002, 10002, 10002, 10002, 10002.10002, 10002.10002, 10002.10002),(10003, 10003, 10003, 10003, 10003, 10003, 10003.10003, 10003.10003, 10003.10003),(10004, 10004, 10004, 10004, 10004, 10004, 10004.10004, 10004.10004, 10004.10004),(10005, 10005, 10005, 10005, 10005, 10005, 10005.10005, 10005.10005, 10005.10005),(10006, 10006, 10006, 10006, 10006, 10006, 10006.10006, 10006.10006, 10006.10006),(20001, 20001, 20001, 20001, 20001, 20001, 20001.20001, 20001.20001, 20001.20001),(20002, 20002, 20002, 20002, 20002, 20002, 20002.20002, 20002.20002, 20002.20002),(20003, 20003, 20003, 20003, 20003, 20003, 20003.20003, 20003.20003, 20003.20003),(20004, 20004, 20004, 20004, 20004, 20004, 20004.20004, 20004.20004, 20004.20004),(20005, 20005, 20005, 20005, 20005, 20005, 20005.20005, 20005.20005, 20005.20005),(20006, 20006, 20006, 20006, 20006, 20006, 20006.20006, 20006.20006, 20006.20006)"""
sql """analyze table part with sync"""
result = sql """show column cached stats part"""
assertEquals(9, result.size())
result = sql """show column stats part"""
assertEquals(9, result.size())
result = sql """show table stats part"""
all_columns = result[0][4]
columns = all_columns.split(",");
assertEquals(9, columns.size())

sql """truncate table part partition(p1)"""
result = sql """show column cached stats part"""
assertEquals(9, result.size())
result = sql """show column stats part"""
assertEquals(9, result.size())
result = sql """show table stats part"""
all_columns = result[0][4]
columns = all_columns.split(",");
assertEquals(9, columns.size())

sql """drop database if exists test_drop_stats_and_truncate"""
}

0 comments on commit 817766f

Please sign in to comment.