diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index b5899435343b13..864e3ece1ef35b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -52,12 +52,16 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -68,6 +72,10 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable, GsonPos // to avoid erase log ahead of drop log private static final long minEraseLatency = 10 * 60 * 1000; // 10 min + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + private Map idToDatabase; private Map idToTable; private Map idToPartition; @@ -85,36 +93,36 @@ public CatalogRecycleBin() { idToRecycleTime = Maps.newHashMap(); } - public synchronized boolean allTabletsInRecycledStatus(List backendTabletIds) { - Set recycledTabletSet = Sets.newHashSet(); - - Iterator> iterator = idToPartition.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - RecyclePartitionInfo partitionInfo = entry.getValue(); - Partition partition = partitionInfo.getPartition(); - addRecycledTabletsForPartition(recycledTabletSet, partition); - } + public boolean allTabletsInRecycledStatus(List backendTabletIds) { + readLock.lock(); + try { + Set recycledTabletSet = Sets.newHashSet(); - Iterator> tableIter = idToTable.entrySet().iterator(); - while (tableIter.hasNext()) { - Map.Entry entry = tableIter.next(); - RecycleTableInfo tableInfo = entry.getValue(); - Table table = tableInfo.getTable(); - addRecycledTabletsForTable(recycledTabletSet, table); - } + for (Map.Entry entry : idToPartition.entrySet()) { + RecyclePartitionInfo partitionInfo = entry.getValue(); + Partition partition = partitionInfo.getPartition(); + addRecycledTabletsForPartition(recycledTabletSet, partition); + } - Iterator> dbIterator = idToDatabase.entrySet().iterator(); - while (dbIterator.hasNext()) { - Map.Entry entry = dbIterator.next(); - RecycleDatabaseInfo dbInfo = entry.getValue(); - Database db = dbInfo.getDb(); - for (Table table : db.getTables()) { + for (Map.Entry entry : idToTable.entrySet()) { + RecycleTableInfo tableInfo = entry.getValue(); + Table table = tableInfo.getTable(); addRecycledTabletsForTable(recycledTabletSet, table); } - } - return recycledTabletSet.size() >= backendTabletIds.size() && recycledTabletSet.containsAll(backendTabletIds); + for (Map.Entry entry : idToDatabase.entrySet()) { + RecycleDatabaseInfo dbInfo = entry.getValue(); + Database db = dbInfo.getDb(); + for (Table table : db.getTables()) { + addRecycledTabletsForTable(recycledTabletSet, table); + } + } + + return recycledTabletSet.size() >= backendTabletIds.size() + && recycledTabletSet.containsAll(backendTabletIds); + } finally { + readLock.unlock(); + } } private void addRecycledTabletsForTable(Set recycledTabletSet, Table table) { @@ -135,188 +143,300 @@ private void addRecycledTabletsForPartition(Set recycledTabletSet, Partiti } } - public synchronized boolean recycleDatabase(Database db, Set tableNames, Set tableIds, + public boolean recycleDatabase(Database db, Set tableNames, Set tableIds, boolean isReplay, boolean isForceDrop, long replayRecycleTime) { - long recycleTime = 0; - if (idToDatabase.containsKey(db.getId())) { - LOG.error("db[{}] already in recycle bin.", db.getId()); - return false; - } - - // db should be empty. all tables are recycled before - Preconditions.checkState(db.getTables().isEmpty()); + writeLock.lock(); + try { + long recycleTime = 0; + if (idToDatabase.containsKey(db.getId())) { + LOG.error("db[{}] already in recycle bin.", db.getId()); + return false; + } - // recycle db - RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db, tableNames, tableIds); - idToDatabase.put(db.getId(), databaseInfo); - if (isForceDrop) { - // The 'force drop' database should be recycle immediately. - recycleTime = 0; - } else if (!isReplay || replayRecycleTime == 0) { - recycleTime = System.currentTimeMillis(); - } else { - recycleTime = replayRecycleTime; + // db should be empty. all tables are recycled before + Preconditions.checkState(db.getTables().isEmpty()); + + // recycle db + RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db, tableNames, tableIds); + idToDatabase.put(db.getId(), databaseInfo); + if (isForceDrop) { + // the 'force drop' database should be recycle immediately. + recycleTime = 0; + } else if (!isReplay || replayRecycleTime == 0) { + recycleTime = System.currentTimeMillis(); + } else { + recycleTime = replayRecycleTime; + } + idToRecycleTime.put(db.getId(), recycleTime); + LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(), db.getFullName(), isForceDrop); + return true; + } finally { + writeLock.unlock(); } - idToRecycleTime.put(db.getId(), recycleTime); - LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(), db.getFullName(), isForceDrop); - return true; } - public synchronized boolean recycleTable(long dbId, Table table, boolean isReplay, + public boolean recycleTable(long dbId, Table table, boolean isReplay, boolean isForceDrop, long replayRecycleTime) { - long recycleTime = 0; - if (idToTable.containsKey(table.getId())) { + // check if the table is already in the recycle bin + boolean alreadyExists = false; + readLock.lock(); + try { + alreadyExists = idToTable.containsKey(table.getId()); + } finally { + readLock.unlock(); + } + + if (alreadyExists) { LOG.error("table[{}] already in recycle bin.", table.getId()); return false; } - // recycle table - RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table); + // calculate the recovery time + long recycleTime = 0; if (isForceDrop) { - // The 'force drop' table should be recycle immediately. + // the 'force drop' table should be recycle immediately. recycleTime = 0; } else if (!isReplay || replayRecycleTime == 0) { recycleTime = System.currentTimeMillis(); } else { recycleTime = replayRecycleTime; } - idToRecycleTime.put(table.getId(), recycleTime); - idToTable.put(table.getId(), tableInfo); - LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(), table.getName(), isForceDrop); - return true; + + // create recovery table information + RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table); + + // acquire a write lock to update the internal state + writeLock.lock(); + try { + // check again if the table is already in the recycle bin + // it may have been added by another thread after we released the read lock + if (idToTable.containsKey(table.getId())) { + LOG.error("table[{}] already in recycle bin.", table.getId()); + return false; + } + + idToRecycleTime.put(table.getId(), recycleTime); + idToTable.put(table.getId(), tableInfo); + LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(), table.getName(), isForceDrop); + return true; + } finally { + writeLock.unlock(); + } } - public synchronized boolean recyclePartition(long dbId, long tableId, String tableName, Partition partition, + public boolean recyclePartition(long dbId, long tableId, String tableName, Partition partition, Range range, PartitionItem listPartitionItem, DataProperty dataProperty, ReplicaAllocation replicaAlloc, boolean isInMemory, boolean isMutable) { - if (idToPartition.containsKey(partition.getId())) { + // check if the table is already in the recycle bin + boolean alreadyExists = false; + readLock.lock(); + try { + alreadyExists = idToPartition.containsKey(partition.getId()); + } finally { + readLock.unlock(); + } + + if (alreadyExists) { LOG.error("partition[{}] already in recycle bin.", partition.getId()); return false; } - // recycle partition + // create recovery partition information RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, tableId, partition, range, listPartitionItem, dataProperty, replicaAlloc, isInMemory, isMutable); - idToRecycleTime.put(partition.getId(), System.currentTimeMillis()); - idToPartition.put(partition.getId(), partitionInfo); - LOG.info("recycle partition[{}-{}] of table [{}-{}]", partition.getId(), partition.getName(), - tableId, tableName); - return true; + long recycleTime = System.currentTimeMillis(); + + // acquire a write lock to update the internal status + writeLock.lock(); + try { + if (idToPartition.containsKey(partition.getId())) { + LOG.error("partition[{}] already in recycle bin.", partition.getId()); + return false; + } + + idToRecycleTime.put(partition.getId(), recycleTime); + idToPartition.put(partition.getId(), partitionInfo); + LOG.info("recycle partition[{}-{}] of table [{}-{}]", partition.getId(), partition.getName(), + tableId, tableName); + return true; + } finally { + writeLock.unlock(); + } } - public synchronized Long getRecycleTimeById(long id) { - return idToRecycleTime.get(id); + public Long getRecycleTimeById(long id) { + readLock.lock(); + try { + return idToRecycleTime.get(id); + } finally { + readLock.unlock(); + } } - public synchronized void setRecycleTimeByIdForReplay(long id, Long recycleTime) { - idToRecycleTime.put(id, recycleTime); + public void setRecycleTimeByIdForReplay(long id, Long recycleTime) { + writeLock.lock(); + try { + idToRecycleTime.put(id, recycleTime); + } finally { + writeLock.unlock(); + } } - public synchronized boolean isRecycleDatabase(long dbId) { - return idToDatabase.containsKey(dbId); + public boolean isRecycleDatabase(long dbId) { + readLock.lock(); + try { + return idToDatabase.containsKey(dbId); + } finally { + readLock.unlock(); + } } - public synchronized boolean isRecycleTable(long dbId, long tableId) { - return isRecycleDatabase(dbId) || idToTable.containsKey(tableId); + public boolean isRecycleTable(long dbId, long tableId) { + readLock.lock(); + try { + return isRecycleDatabase(dbId) || idToTable.containsKey(tableId); + } finally { + readLock.unlock(); + } } - public synchronized boolean isRecyclePartition(long dbId, long tableId, long partitionId) { - return isRecycleTable(dbId, tableId) || idToPartition.containsKey(partitionId); + public boolean isRecyclePartition(long dbId, long tableId, long partitionId) { + readLock.lock(); + try { + return isRecycleTable(dbId, tableId) || idToPartition.containsKey(partitionId); + } finally { + readLock.unlock(); + } } - public synchronized void getRecycleIds(Set dbIds, Set tableIds, Set partitionIds) { - dbIds.addAll(idToDatabase.keySet()); - tableIds.addAll(idToTable.keySet()); - partitionIds.addAll(idToPartition.keySet()); + public void getRecycleIds(Set dbIds, Set tableIds, Set partitionIds) { + readLock.lock(); + try { + dbIds.addAll(idToDatabase.keySet()); + tableIds.addAll(idToTable.keySet()); + partitionIds.addAll(idToPartition.keySet()); + } finally { + readLock.unlock(); + } } - private synchronized boolean isExpire(long id, long currentTimeMs) { - long latency = currentTimeMs - idToRecycleTime.get(id); - return (Config.catalog_trash_ignore_min_erase_latency || latency > minEraseLatency) - && latency > Config.catalog_trash_expire_second * 1000L; + private boolean isExpire(long id, long currentTimeMs) { + readLock.lock(); + try { + long latency = currentTimeMs - idToRecycleTime.get(id); + return (Config.catalog_trash_ignore_min_erase_latency || latency > minEraseLatency) + && latency > Config.catalog_trash_expire_second * 1000L; + } finally { + readLock.unlock(); + } } - private synchronized void eraseDatabase(long currentTimeMs, int keepNum) { - int eraseNum = 0; - StopWatch watch = StopWatch.createStarted(); + private void eraseDatabase(long currentTimeMs, int keepNum) { + List> databasesToErase = new ArrayList<>(); + Set dbNames = new HashSet<>(); + + // collect the expired databases to be erased within the lock + writeLock.lock(); try { - // 1. erase expired database + StopWatch watch = StopWatch.createStarted(); + // 1. collect expired databases Iterator> dbIter = idToDatabase.entrySet().iterator(); while (dbIter.hasNext()) { Map.Entry entry = dbIter.next(); RecycleDatabaseInfo dbInfo = entry.getValue(); Database db = dbInfo.getDb(); if (isExpire(db.getId(), currentTimeMs)) { - // erase db - dbIter.remove(); - idToRecycleTime.remove(entry.getKey()); - Env.getCurrentEnv().eraseDatabase(db.getId(), true); - LOG.info("erase db[{}]", db.getId()); - eraseNum++; + // collect information only, do not perform erasure within the lock + databasesToErase.add(Pair.of(db.getId(), db)); } } - // 2. erase exceed number - if (keepNum < 0) { - return; - } - Set dbNames = idToDatabase.values().stream().map(d -> d.getDb().getFullName()) - .collect(Collectors.toSet()); - for (String dbName : dbNames) { - eraseDatabaseWithSameName(dbName, currentTimeMs, keepNum); + + // 2. if handling databases with the same name, collect the database name information + if (keepNum >= 0) { + dbNames = idToDatabase.values().stream() + .map(d -> d.getDb().getFullName()) + .collect(Collectors.toSet()); } - } finally { + watch.stop(); - LOG.info("eraseDatabase eraseNum: {} cost: {}ms", eraseNum, watch.getTime()); + LOG.info("eraseDatabase collected {} databases to erase, cost: {}ms", + databasesToErase.size(), watch.getTime()); + } finally { + writeLock.unlock(); } - } - private synchronized List getSameNameDbIdListToErase(String dbName, int maxSameNameTrashNum) { - Iterator> iterator = idToDatabase.entrySet().iterator(); - List> dbRecycleTimeLists = Lists.newArrayList(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - RecycleDatabaseInfo dbInfo = entry.getValue(); - Database db = dbInfo.getDb(); - if (db.getFullName().equals(dbName)) { - List dbRecycleTimeInfo = Lists.newArrayList(); - dbRecycleTimeInfo.add(entry.getKey()); - dbRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey())); + // perform the erasure operation outside the lock + int eraseNum = 0; + StopWatch totalWatch = StopWatch.createStarted(); + + // process expired databases + for (Pair pair : databasesToErase) { + long dbId = pair.first; + Env.getCurrentEnv().eraseDatabase(dbId, true); - dbRecycleTimeLists.add(dbRecycleTimeInfo); + // acquire the lock to update the internal state + writeLock.lock(); + try { + if (idToDatabase.containsKey(dbId)) { + idToDatabase.remove(dbId); + idToRecycleTime.remove(dbId); + LOG.info("erase db[{}]", dbId); + eraseNum++; + } + } finally { + writeLock.unlock(); } } - List dbIdToErase = Lists.newArrayList(); - if (dbRecycleTimeLists.size() <= maxSameNameTrashNum) { - return dbIdToErase; - } - // order by recycle time desc - dbRecycleTimeLists.sort((x, y) -> - (x.get(1).longValue() < y.get(1).longValue()) ? 1 : ((x.get(1).equals(y.get(1))) ? 0 : -1)); - for (int i = maxSameNameTrashNum; i < dbRecycleTimeLists.size(); i++) { - dbIdToErase.add(dbRecycleTimeLists.get(i).get(0)); + if (keepNum >= 0) { + for (String dbName : dbNames) { + eraseDatabaseWithSameName(dbName, currentTimeMs, keepNum); + } } - return dbIdToErase; + + totalWatch.stop(); + LOG.info("eraseDatabase total eraseNum: {} cost: {}ms", eraseNum, totalWatch.getTime()); } - private synchronized void eraseDatabaseWithSameName(String dbName, long currentTimeMs, int maxSameNameTrashNum) { - List dbIdToErase = getSameNameDbIdListToErase(dbName, maxSameNameTrashNum); + private void eraseDatabaseWithSameName(String dbName, long currentTimeMs, int maxSameNameTrashNum) { + List dbIdToErase; + readLock.lock(); + try { + dbIdToErase = getSameNameDbIdListToErase(dbName, maxSameNameTrashNum); + } finally { + readLock.unlock(); + } + for (Long dbId : dbIdToErase) { - RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId); - if (!isExpireMinLatency(dbId, currentTimeMs)) { - continue; + RecycleDatabaseInfo dbInfo = null; + boolean shouldErase = false; + + writeLock.lock(); + try { + dbInfo = idToDatabase.get(dbId); + if (dbInfo != null && isExpireMinLatency(dbId, currentTimeMs)) { + shouldErase = true; + } + } finally { + writeLock.unlock(); } - eraseAllTables(dbInfo); - idToDatabase.remove(dbId); - idToRecycleTime.remove(dbId); - Env.getCurrentEnv().eraseDatabase(dbId, true); - LOG.info("erase database[{}] name: {}", dbId, dbName); - } - } - private synchronized boolean isExpireMinLatency(long id, long currentTimeMs) { - return (currentTimeMs - idToRecycleTime.get(id)) > minEraseLatency; + if (shouldErase && dbInfo != null) { + eraseAllTables(dbInfo); + Env.getCurrentEnv().eraseDatabase(dbId, true); + writeLock.lock(); + try { + if (idToDatabase.containsKey(dbId)) { + idToDatabase.remove(dbId); + idToRecycleTime.remove(dbId); + LOG.info("erase database[{}] name: {}", dbId, dbName); + } + } finally { + writeLock.unlock(); + } + } + } } private void eraseAllTables(RecycleDatabaseInfo dbInfo) { @@ -324,39 +444,64 @@ private void eraseAllTables(RecycleDatabaseInfo dbInfo) { Set tableNames = Sets.newHashSet(dbInfo.getTableNames()); Set tableIds = Sets.newHashSet(dbInfo.getTableIds()); long dbId = db.getId(); - Iterator> iterator = idToTable.entrySet().iterator(); - while (iterator.hasNext() && !tableNames.isEmpty()) { - Map.Entry entry = iterator.next(); - RecycleTableInfo tableInfo = entry.getValue(); - if (tableInfo.getDbId() != dbId || !tableNames.contains(tableInfo.getTable().getName()) - || !tableIds.contains(tableInfo.getTable().getId())) { - continue; + + List> tablesToErase = new ArrayList<>(); + + writeLock.lock(); + try { + Iterator> iterator = idToTable.entrySet().iterator(); + while (iterator.hasNext() && !tableNames.isEmpty()) { + Map.Entry entry = iterator.next(); + RecycleTableInfo tableInfo = entry.getValue(); + if (tableInfo.getDbId() != dbId || !tableNames.contains(tableInfo.getTable().getName()) + || !tableIds.contains(tableInfo.getTable().getId())) { + continue; + } + + Table table = tableInfo.getTable(); + tablesToErase.add(Pair.of(table.getId(), table)); + tableNames.remove(table.getName()); } + } finally { + writeLock.unlock(); + } + + for (Pair pair : tablesToErase) { + long tableId = pair.first; + Table table = pair.second; - Table table = tableInfo.getTable(); if (table.isManagedTable()) { Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, false); } - iterator.remove(); - idToRecycleTime.remove(table.getId()); - tableNames.remove(table.getName()); - Env.getCurrentEnv().getEditLog().logEraseTable(table.getId()); - LOG.info("erase db[{}] with table[{}]: {}", dbId, table.getId(), table.getName()); + + writeLock.lock(); + try { + if (idToTable.containsKey(tableId)) { + idToTable.remove(tableId); + idToRecycleTime.remove(tableId); + Env.getCurrentEnv().getEditLog().logEraseTable(tableId); + LOG.info("erase db[{}] with table[{}]: {}", dbId, tableId, table.getName()); + } + } finally { + writeLock.unlock(); + } } } - public synchronized void replayEraseDatabase(long dbId) { + public void replayEraseDatabase(long dbId) { idToDatabase.remove(dbId); idToRecycleTime.remove(dbId); Env.getCurrentEnv().eraseDatabase(dbId, false); LOG.info("replay erase db[{}]", dbId); } - private synchronized void eraseTable(long currentTimeMs, int keepNum) { - int eraseNum = 0; - StopWatch watch = StopWatch.createStarted(); + private void eraseTable(long currentTimeMs, int keepNum) { + List> tablesToErase = new ArrayList<>(); + Map> dbId2TableNames = Maps.newHashMap(); + + writeLock.lock(); try { - // 1. erase expired tables + StopWatch watch = StopWatch.createStarted(); Iterator> tableIter = idToTable.entrySet().iterator(); while (tableIter.hasNext()) { Map.Entry entry = tableIter.next(); @@ -365,100 +510,149 @@ private synchronized void eraseTable(long currentTimeMs, int keepNum) { long tableId = table.getId(); if (isExpire(tableId, currentTimeMs)) { - if (table.isManagedTable()) { - Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, false); + tablesToErase.add(Pair.of(tableId, table)); + } + } + + if (keepNum >= 0) { + for (RecycleTableInfo tableInfo : idToTable.values()) { + Set tblNames = dbId2TableNames.get(tableInfo.dbId); + if (tblNames == null) { + tblNames = Sets.newHashSet(); + dbId2TableNames.put(tableInfo.dbId, tblNames); } + tblNames.add(tableInfo.getTable().getName()); + } + } - // erase table - tableIter.remove(); - idToRecycleTime.remove(tableId); + watch.stop(); + LOG.info("eraseTable collected {} tables to erase, cost: {}ms", + tablesToErase.size(), watch.getTime()); + } finally { + writeLock.unlock(); + } + + int eraseNum = 0; + StopWatch totalWatch = StopWatch.createStarted(); + + for (Pair pair : tablesToErase) { + long tableId = pair.first; + Table table = pair.second; + if (table.isManagedTable()) { + Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, false); + } - // log + writeLock.lock(); + try { + if (idToTable.containsKey(tableId)) { + idToTable.remove(tableId); + idToRecycleTime.remove(tableId); Env.getCurrentEnv().getEditLog().logEraseTable(tableId); LOG.info("erase table[{}]", tableId); eraseNum++; } - } // end for tables - - // 2. erase exceed num - if (keepNum < 0) { - return; - } - Map> dbId2TableNames = Maps.newHashMap(); - for (RecycleTableInfo tableInfo : idToTable.values()) { - Set tblNames = dbId2TableNames.get(tableInfo.dbId); - if (tblNames == null) { - tblNames = Sets.newHashSet(); - dbId2TableNames.put(tableInfo.dbId, tblNames); - } - tblNames.add(tableInfo.getTable().getName()); + } finally { + writeLock.unlock(); } + } + + if (keepNum >= 0) { for (Map.Entry> entry : dbId2TableNames.entrySet()) { for (String tblName : entry.getValue()) { eraseTableWithSameName(entry.getKey(), tblName, currentTimeMs, keepNum); } } - } finally { - watch.stop(); - LOG.info("eraseTable eraseNum: {} cost: {}ms", eraseNum, watch.getTime()); } + + totalWatch.stop(); + LOG.info("eraseTable total eraseNum: {} cost: {}ms", eraseNum, totalWatch.getTime()); } - private synchronized List getSameNameTableIdListToErase(long dbId, String tableName, + private List getSameNameTableIdListToErase(long dbId, String tableName, int maxSameNameTrashNum) { - Iterator> iterator = idToTable.entrySet().iterator(); - List> tableRecycleTimeLists = Lists.newArrayList(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - RecycleTableInfo tableInfo = entry.getValue(); - if (tableInfo.getDbId() != dbId) { - continue; - } + readLock.lock(); + try { + Iterator> iterator = idToTable.entrySet().iterator(); + List> tableRecycleTimeLists = Lists.newArrayList(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + RecycleTableInfo tableInfo = entry.getValue(); + if (tableInfo.getDbId() != dbId) { + continue; + } - Table table = tableInfo.getTable(); - if (table.getName().equals(tableName)) { - List tableRecycleTimeInfo = Lists.newArrayList(); - tableRecycleTimeInfo.add(entry.getKey()); - tableRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey())); + Table table = tableInfo.getTable(); + if (table.getName().equals(tableName)) { + List tableRecycleTimeInfo = Lists.newArrayList(); + tableRecycleTimeInfo.add(entry.getKey()); + tableRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey())); - tableRecycleTimeLists.add(tableRecycleTimeInfo); + tableRecycleTimeLists.add(tableRecycleTimeInfo); + } } - } - List tableIdToErase = Lists.newArrayList(); - if (tableRecycleTimeLists.size() <= maxSameNameTrashNum) { - return tableIdToErase; - } - // order by recycle time desc - tableRecycleTimeLists.sort((x, y) -> - (x.get(1).longValue() < y.get(1).longValue()) ? 1 : ((x.get(1).equals(y.get(1))) ? 0 : -1)); + List tableIdToErase = Lists.newArrayList(); + if (tableRecycleTimeLists.size() <= maxSameNameTrashNum) { + return tableIdToErase; + } + // order by recycle time desc + tableRecycleTimeLists.sort((x, y) -> + (x.get(1).longValue() < y.get(1).longValue()) ? 1 : ((x.get(1).equals(y.get(1))) ? 0 : -1)); - for (int i = maxSameNameTrashNum; i < tableRecycleTimeLists.size(); i++) { - tableIdToErase.add(tableRecycleTimeLists.get(i).get(0)); + for (int i = maxSameNameTrashNum; i < tableRecycleTimeLists.size(); i++) { + tableIdToErase.add(tableRecycleTimeLists.get(i).get(0)); + } + return tableIdToErase; + } finally { + readLock.unlock(); } - return tableIdToErase; } - private synchronized void eraseTableWithSameName(long dbId, String tableName, long currentTimeMs, + private void eraseTableWithSameName(long dbId, String tableName, long currentTimeMs, int maxSameNameTrashNum) { - List tableIdToErase = getSameNameTableIdListToErase(dbId, tableName, maxSameNameTrashNum); + List tableIdToErase; + readLock.lock(); + try { + tableIdToErase = getSameNameTableIdListToErase(dbId, tableName, maxSameNameTrashNum); + } finally { + readLock.unlock(); + } + for (Long tableId : tableIdToErase) { - RecycleTableInfo tableInfo = idToTable.get(tableId); - if (!isExpireMinLatency(tableId, currentTimeMs)) { - continue; - } - Table table = tableInfo.getTable(); - if (table.isManagedTable()) { - Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, false); + Table table = null; + boolean shouldErase = false; + + writeLock.lock(); + try { + RecycleTableInfo tableInfo = idToTable.get(tableId); + if (tableInfo != null && isExpireMinLatency(tableId, currentTimeMs)) { + table = tableInfo.getTable(); + shouldErase = true; + } + } finally { + writeLock.unlock(); } - idToTable.remove(tableId); - idToRecycleTime.remove(tableId); - Env.getCurrentEnv().getEditLog().logEraseTable(tableId); - LOG.info("erase table[{}] name: {} from db[{}]", tableId, tableName, dbId); + if (shouldErase && table != null) { + if (table.isManagedTable()) { + Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, false); + } + + writeLock.lock(); + try { + if (idToTable.containsKey(tableId)) { + idToTable.remove(tableId); + idToRecycleTime.remove(tableId); + Env.getCurrentEnv().getEditLog().logEraseTable(tableId); + LOG.info("erase table[{}] name: {} from db[{}]", tableId, tableName, dbId); + } + } finally { + writeLock.unlock(); + } + } } } - public synchronized void replayEraseTable(long tableId) { + public void replayEraseTable(long tableId) { LOG.info("before replay erase table[{}]", tableId); RecycleTableInfo tableInfo = idToTable.remove(tableId); idToRecycleTime.remove(tableId); @@ -474,110 +668,178 @@ public synchronized void replayEraseTable(long tableId) { LOG.info("replay erase table[{}]", tableId); } - private synchronized void erasePartition(long currentTimeMs, int keepNum) { - int eraseNum = 0; - StopWatch watch = StopWatch.createStarted(); + private void erasePartition(long currentTimeMs, int keepNum) { + List> partitionsToErase = new ArrayList<>(); + + writeLock.lock(); try { - // 1. erase expired partitions + StopWatch watch = StopWatch.createStarted(); Iterator> iterator = idToPartition.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); RecyclePartitionInfo partitionInfo = entry.getValue(); Partition partition = partitionInfo.getPartition(); - long partitionId = entry.getKey(); + if (isExpire(partitionId, currentTimeMs)) { - Env.getCurrentEnv().onErasePartition(partition); - // erase partition - iterator.remove(); - idToRecycleTime.remove(partitionId); - // log - Env.getCurrentEnv().getEditLog().logErasePartition(partitionId); - LOG.info("erase partition[{}]. reason: expired", partitionId); - eraseNum++; + partitionsToErase.add(Pair.of(partitionId, partition)); } - } // end for partitions + } - // 2. erase exceed number if (keepNum < 0) { + watch.stop(); + LOG.info("erasePartition collected {} partitions to erase, cost: {}ms", + partitionsToErase.size(), watch.getTime()); return; } - com.google.common.collect.Table> dbTblId2PartitionNames = HashBasedTable.create(); - for (RecyclePartitionInfo partitionInfo : idToPartition.values()) { - Set partitionNames = dbTblId2PartitionNames.get(partitionInfo.dbId, partitionInfo.tableId); - if (partitionNames == null) { - partitionNames = Sets.newHashSet(); - dbTblId2PartitionNames.put(partitionInfo.dbId, partitionInfo.tableId, partitionNames); + + watch.stop(); + LOG.info("erasePartition collected info, cost: {}ms", watch.getTime()); + } finally { + writeLock.unlock(); + } + + int eraseNum = 0; + StopWatch totalWatch = StopWatch.createStarted(); + + for (Pair pair : partitionsToErase) { + long partitionId = pair.first; + Partition partition = pair.second; + + Env.getCurrentEnv().onErasePartition(partition); + + writeLock.lock(); + try { + if (idToPartition.containsKey(partitionId)) { + idToPartition.remove(partitionId); + idToRecycleTime.remove(partitionId); + Env.getCurrentEnv().getEditLog().logErasePartition(partitionId); + LOG.info("erase partition[{}]. reason: expired", partitionId); + eraseNum++; } - partitionNames.add(partitionInfo.getPartition().getName()); + } finally { + writeLock.unlock(); } + } + + if (keepNum >= 0) { + com.google.common.collect.Table> dbTblId2PartitionNames; + writeLock.lock(); + try { + dbTblId2PartitionNames = buildPartitionNameMap(); + } finally { + writeLock.unlock(); + } + for (Cell> cell : dbTblId2PartitionNames.cellSet()) { for (String partitionName : cell.getValue()) { - erasePartitionWithSameName(cell.getRowKey(), cell.getColumnKey(), partitionName, currentTimeMs, - keepNum); + erasePartitionWithSameName(cell.getRowKey(), cell.getColumnKey(), partitionName, + currentTimeMs, keepNum); } } - } finally { - watch.stop(); - LOG.info("erasePartition eraseNum: {} cost: {}ms", eraseNum, watch.getTime()); } + + totalWatch.stop(); + LOG.info("erasePartition total eraseNum: {} cost: {}ms", eraseNum, totalWatch.getTime()); + } + + private com.google.common.collect.Table> buildPartitionNameMap() { + com.google.common.collect.Table> dbTblId2PartitionNames = HashBasedTable.create(); + for (RecyclePartitionInfo partitionInfo : idToPartition.values()) { + Set partitionNames = dbTblId2PartitionNames.get(partitionInfo.dbId, partitionInfo.tableId); + if (partitionNames == null) { + partitionNames = Sets.newHashSet(); + dbTblId2PartitionNames.put(partitionInfo.dbId, partitionInfo.tableId, partitionNames); + } + partitionNames.add(partitionInfo.getPartition().getName()); + } + return dbTblId2PartitionNames; } - private synchronized List getSameNamePartitionIdListToErase(long dbId, long tableId, String partitionName, + private List getSameNamePartitionIdListToErase(long dbId, long tableId, String partitionName, int maxSameNameTrashNum) { - Iterator> iterator = idToPartition.entrySet().iterator(); - List> partitionRecycleTimeLists = Lists.newArrayList(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - RecyclePartitionInfo partitionInfo = entry.getValue(); - if (partitionInfo.getDbId() != dbId || partitionInfo.getTableId() != tableId) { - continue; + readLock.lock(); + try { + List> partitionRecycleTimeLists = Lists.newArrayList(); + for (Map.Entry entry : idToPartition.entrySet()) { + RecyclePartitionInfo partitionInfo = entry.getValue(); + if (partitionInfo.getDbId() != dbId || partitionInfo.getTableId() != tableId) { + continue; + } + + Partition partition = partitionInfo.getPartition(); + if (partition.getName().equals(partitionName)) { + List partitionRecycleTimeInfo = Lists.newArrayList(); + partitionRecycleTimeInfo.add(entry.getKey()); + partitionRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey())); + partitionRecycleTimeLists.add(partitionRecycleTimeInfo); + } } - Partition partition = partitionInfo.getPartition(); - if (partition.getName().equals(partitionName)) { - List partitionRecycleTimeInfo = Lists.newArrayList(); - partitionRecycleTimeInfo.add(entry.getKey()); - partitionRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey())); + if (partitionRecycleTimeLists.size() <= maxSameNameTrashNum) { + return Lists.newArrayList(); + } + + // order by recycle time desc + partitionRecycleTimeLists.sort((x, y) -> + (x.get(1).longValue() < y.get(1).longValue()) ? 1 : ((x.get(1).equals(y.get(1))) ? 0 : -1)); - partitionRecycleTimeLists.add(partitionRecycleTimeInfo); + List partitionIdToErase = Lists.newArrayList(); + for (int i = maxSameNameTrashNum; i < partitionRecycleTimeLists.size(); i++) { + partitionIdToErase.add(partitionRecycleTimeLists.get(i).get(0)); } - } - List partitionIdToErase = Lists.newArrayList(); - if (partitionRecycleTimeLists.size() <= maxSameNameTrashNum) { return partitionIdToErase; + } finally { + readLock.unlock(); } - // order by recycle time desc - partitionRecycleTimeLists.sort((x, y) -> - (x.get(1).longValue() < y.get(1).longValue()) ? 1 : ((x.get(1).equals(y.get(1))) ? 0 : -1)); - - for (int i = maxSameNameTrashNum; i < partitionRecycleTimeLists.size(); i++) { - partitionIdToErase.add(partitionRecycleTimeLists.get(i).get(0)); - } - return partitionIdToErase; } - private synchronized void erasePartitionWithSameName(long dbId, long tableId, String partitionName, + private void erasePartitionWithSameName(long dbId, long tableId, String partitionName, long currentTimeMs, int maxSameNameTrashNum) { - List partitionIdToErase = getSameNamePartitionIdListToErase(dbId, tableId, partitionName, - maxSameNameTrashNum); + List partitionIdToErase; + readLock.lock(); + try { + partitionIdToErase = getSameNamePartitionIdListToErase(dbId, tableId, partitionName, + maxSameNameTrashNum); + } finally { + readLock.unlock(); + } + for (Long partitionId : partitionIdToErase) { - RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId); - if (!isExpireMinLatency(partitionId, currentTimeMs)) { - continue; + Partition partition = null; + boolean shouldErase = false; + + writeLock.lock(); + try { + RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId); + if (partitionInfo != null && isExpireMinLatency(partitionId, currentTimeMs)) { + partition = partitionInfo.getPartition(); + shouldErase = true; + } + } finally { + writeLock.unlock(); } - Partition partition = partitionInfo.getPartition(); - Env.getCurrentEnv().onErasePartition(partition); - idToPartition.remove(partitionId); - idToRecycleTime.remove(partitionId); - Env.getCurrentEnv().getEditLog().logErasePartition(partitionId); - LOG.info("erase partition[{}] name: {} from table[{}] from db[{}]", partitionId, partitionName, tableId, - dbId); + if (shouldErase && partition != null) { + Env.getCurrentEnv().onErasePartition(partition); + + writeLock.lock(); + try { + if (idToPartition.containsKey(partitionId)) { + idToPartition.remove(partitionId); + idToRecycleTime.remove(partitionId); + Env.getCurrentEnv().getEditLog().logErasePartition(partitionId); + LOG.info("erase partition[{}] name: {} from table[{}] from db[{}]", + partitionId, partitionName, tableId, dbId); + } + } finally { + writeLock.unlock(); + } + } } } - public synchronized void replayErasePartition(long partitionId) { + public void replayErasePartition(long partitionId) { RecyclePartitionInfo partitionInfo = idToPartition.remove(partitionId); idToRecycleTime.remove(partitionId); @@ -592,7 +854,7 @@ public synchronized void replayErasePartition(long partitionId) { LOG.info("replay erase partition[{}]", partitionId); } - public synchronized Database recoverDatabase(String dbName, long dbId) throws DdlException { + public Database recoverDatabase(String dbName, long dbId) throws DdlException { RecycleDatabaseInfo dbInfo = null; // The recycle time of the force dropped tables and databases will be set to zero, use 1 here to // skip these databases and tables. @@ -628,7 +890,7 @@ public synchronized Database recoverDatabase(String dbName, long dbId) throws Dd return db; } - public synchronized Database replayRecoverDatabase(long dbId) { + public Database replayRecoverDatabase(long dbId) { RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId); try { @@ -675,7 +937,7 @@ private void recoverAllTables(RecycleDatabaseInfo dbInfo) throws DdlException { } } - public synchronized boolean recoverTable(Database db, String tableName, long tableId, + public boolean recoverTable(Database db, String tableName, long tableId, String newTableName) throws DdlException { // make sure to get db lock Table table = null; @@ -721,7 +983,7 @@ public synchronized boolean recoverTable(Database db, String tableName, long tab return true; } - public synchronized void replayRecoverTable(Database db, long tableId, String newTableName) throws DdlException { + public void replayRecoverTable(Database db, long tableId, String newTableName) throws DdlException { // make sure to get db write lock Iterator> iterator = idToTable.entrySet().iterator(); while (iterator.hasNext()) { @@ -739,7 +1001,7 @@ public synchronized void replayRecoverTable(Database db, long tableId, String ne } } - private synchronized boolean innerRecoverTable(Database db, Table table, String tableName, String newTableName, + private boolean innerRecoverTable(Database db, Table table, String tableName, String newTableName, Iterator> iterator, boolean isReplay) throws DdlException { table.writeLock(); @@ -788,7 +1050,7 @@ private synchronized boolean innerRecoverTable(Database db, Table table, String return true; } - public synchronized void recoverPartition(long dbId, OlapTable table, String partitionName, + public void recoverPartition(long dbId, OlapTable table, String partitionName, long partitionIdToRecover, String newPartitionName) throws DdlException { if (table.getType() == TableType.MATERIALIZED_VIEW) { throw new DdlException("Can not recover partition in materialized view: " + table.getName()); @@ -881,7 +1143,7 @@ public synchronized void recoverPartition(long dbId, OlapTable table, String par } // The caller should keep table write lock - public synchronized void replayRecoverPartition(OlapTable table, long partitionId, + public void replayRecoverPartition(OlapTable table, long partitionId, String newPartitionName) throws DdlException { Iterator> iterator = idToPartition.entrySet().iterator(); while (iterator.hasNext()) { @@ -923,7 +1185,7 @@ public synchronized void replayRecoverPartition(OlapTable table, long partitionI } // erase database in catalog recycle bin instantly - public synchronized void eraseDatabaseInstantly(long dbId) throws DdlException { + public void eraseDatabaseInstantly(long dbId) throws DdlException { // 1. find dbInfo and erase db RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId); if (dbInfo != null) { @@ -974,7 +1236,7 @@ public synchronized void eraseDatabaseInstantly(long dbId) throws DdlException { } // erase table in catalog recycle bin instantly - public synchronized void eraseTableInstantly(long tableId) throws DdlException { + public void eraseTableInstantly(long tableId) throws DdlException { // 1. find tableInfo and erase table RecycleTableInfo tableInfo = idToTable.get(tableId); if (tableInfo != null) { @@ -1016,7 +1278,7 @@ public synchronized void eraseTableInstantly(long tableId) throws DdlException { } // erase partition in catalog recycle bin instantly - public synchronized void erasePartitionInstantly(long partitionId) throws DdlException { + public void erasePartitionInstantly(long partitionId) throws DdlException { // 1. find partitionInfo to erase RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId); if (partitionInfo == null) { @@ -1136,7 +1398,7 @@ protected void runAfterCatalogReady() { eraseDatabase(currentTimeMs, keepNum); } - public synchronized List> getInfo() { + public List> getInfo() { Map> dbToDataSize = new HashMap<>(); List> tableInfos = Lists.newArrayList(); for (Map.Entry entry : idToTable.entrySet()) { @@ -1148,7 +1410,6 @@ public synchronized List> getInfo() { info.add(String.valueOf(tableInfo.getDbId())); info.add(String.valueOf(entry.getKey())); info.add(""); - //info.add(String.valueOf(idToRecycleTime.get(entry.getKey()))); info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey()))); // data size long dataSize = table.getDataSize(false); @@ -1189,7 +1450,6 @@ public synchronized List> getInfo() { info.add(String.valueOf(partitionInfo.getDbId())); info.add(String.valueOf(partitionInfo.getTableId())); info.add(String.valueOf(entry.getKey())); - //info.add(String.valueOf(idToRecycleTime.get(entry.getKey()))); info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey()))); // data size long dataSize = partition.getDataSize(false); @@ -1230,7 +1490,6 @@ public synchronized List> getInfo() { info.add(String.valueOf(entry.getKey())); info.add(""); info.add(""); - //info.add(String.valueOf(idToRecycleTime.get(entry.getKey()))); info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey()))); // data size Pair dataSizePair = dbToDataSize.getOrDefault(entry.getKey(), Pair.of(0L, 0L)); @@ -1253,7 +1512,7 @@ public synchronized List> getInfo() { return Stream.of(dbInfos, tableInfos, partitionInfos).flatMap(Collection::stream).collect(Collectors.toList()); } - public synchronized Map> getDbToRecycleSize() { + public Map> getDbToRecycleSize() { Map> dbToRecycleSize = new HashMap<>(); for (Map.Entry entry : idToTable.entrySet()) { RecycleTableInfo tableInfo = entry.getValue(); @@ -1295,7 +1554,7 @@ public synchronized Map> getDbToRecycleSize() { // Need to add "synchronized", because when calling /dump api to dump image, // this class is not protected by any lock, will throw ConcurrentModificationException. @Override - public synchronized void write(DataOutput out) throws IOException { + public void write(DataOutput out) throws IOException { out.writeInt(idToDatabase.size()); for (Map.Entry entry : idToDatabase.entrySet()) { out.writeLong(entry.getKey()); @@ -1653,4 +1912,50 @@ public void readFields(DataInput in) throws IOException { public List getAllDbIds() { return Lists.newArrayList(idToDatabase.keySet()); } + + private List getSameNameDbIdListToErase(String dbName, int maxSameNameTrashNum) { + readLock.lock(); + try { + List> dbRecycleTimeLists = Lists.newArrayList(); + for (Map.Entry entry : idToDatabase.entrySet()) { + RecycleDatabaseInfo dbInfo = entry.getValue(); + Database db = dbInfo.getDb(); + if (db.getFullName().equals(dbName)) { + List dbRecycleTimeInfo = Lists.newArrayList(); + dbRecycleTimeInfo.add(entry.getKey()); + dbRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey())); + dbRecycleTimeLists.add(dbRecycleTimeInfo); + } + } + + List dbIdToErase = Lists.newArrayList(); + if (dbRecycleTimeLists.size() <= maxSameNameTrashNum) { + return dbIdToErase; + } + + // order by recycle time desc + dbRecycleTimeLists.sort((x, y) -> + (x.get(1).longValue() < y.get(1).longValue()) ? 1 : ((x.get(1).equals(y.get(1))) ? 0 : -1)); + + for (int i = maxSameNameTrashNum; i < dbRecycleTimeLists.size(); i++) { + dbIdToErase.add(dbRecycleTimeLists.get(i).get(0)); + } + return dbIdToErase; + } finally { + readLock.unlock(); + } + } + + private boolean isExpireMinLatency(long id, long currentTimeMs) { + readLock.lock(); + try { + Long recycleTime = idToRecycleTime.get(id); + if (recycleTime == null) { + return false; + } + return currentTimeMs - recycleTime > minEraseLatency; + } finally { + readLock.unlock(); + } + } }