From 93108059113e4eb19edb4eb34291155ea60fc56a Mon Sep 17 00:00:00 2001 From: /bin/eash Date: Tue, 18 Feb 2025 19:48:26 +0800 Subject: [PATCH] [Enhancement](lock) Optimize CatalogRecycleBin lock granularity to improve concurrency --- .../doris/catalog/CatalogRecycleBin.java | 193 +++++++++++------- 1 file changed, 120 insertions(+), 73 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index b5899435343b13b..a80996ebf420133 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -52,12 +52,15 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -68,6 +71,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable, GsonPos // to avoid erase log ahead of drop log private static final long minEraseLatency = 10 * 60 * 1000; // 10 min + private final ReadWriteLock lock = new ReentrantReadWriteLock(true); private Map idToDatabase; private Map idToTable; private Map idToPartition; @@ -186,23 +190,28 @@ public synchronized boolean recycleTable(long dbId, Table table, boolean isRepla return true; } - public synchronized boolean recyclePartition(long dbId, long tableId, String tableName, Partition partition, + public boolean recyclePartition(long dbId, long tableId, String tableName, Partition partition, Range range, PartitionItem listPartitionItem, DataProperty dataProperty, ReplicaAllocation replicaAlloc, boolean isInMemory, boolean isMutable) { - if (idToPartition.containsKey(partition.getId())) { - LOG.error("partition[{}] already in recycle bin.", partition.getId()); - return false; - } + lock.writeLock().lock(); + try { + if (idToPartition.containsKey(partition.getId())) { + LOG.error("partition[{}] already in recycle bin.", partition.getId()); + return false; + } - // recycle partition - RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, tableId, partition, - range, listPartitionItem, dataProperty, replicaAlloc, isInMemory, isMutable); - idToRecycleTime.put(partition.getId(), System.currentTimeMillis()); - idToPartition.put(partition.getId(), partitionInfo); - LOG.info("recycle partition[{}-{}] of table [{}-{}]", partition.getId(), partition.getName(), - tableId, tableName); - return true; + // recycle partition + RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId, tableId, partition, + range, listPartitionItem, dataProperty, replicaAlloc, isInMemory, isMutable); + idToRecycleTime.put(partition.getId(), System.currentTimeMillis()); + idToPartition.put(partition.getId(), partitionInfo); + LOG.info("recycle partition[{}-{}] of table [{}-{}]", partition.getId(), partition.getName(), + tableId, tableName); + return true; + } finally { + lock.writeLock().unlock(); + } } public synchronized Long getRecycleTimeById(long id) { @@ -474,47 +483,66 @@ public synchronized void replayEraseTable(long tableId) { LOG.info("replay erase table[{}]", tableId); } - private synchronized void erasePartition(long currentTimeMs, int keepNum) { + private void erasePartition(long currentTimeMs, int keepNum) { int eraseNum = 0; StopWatch watch = StopWatch.createStarted(); - try { - // 1. erase expired partitions - Iterator> iterator = idToPartition.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - RecyclePartitionInfo partitionInfo = entry.getValue(); - Partition partition = partitionInfo.getPartition(); + List partitionsToErase = new ArrayList<>(); - long partitionId = entry.getKey(); - if (isExpire(partitionId, currentTimeMs)) { - Env.getCurrentEnv().onErasePartition(partition); - // erase partition - iterator.remove(); - idToRecycleTime.remove(partitionId); - // log - Env.getCurrentEnv().getEditLog().logErasePartition(partitionId); - LOG.info("erase partition[{}]. reason: expired", partitionId); - eraseNum++; + try { + // 1. First collect expired partitions under write lock + lock.writeLock().lock(); + try { + Iterator> iterator = idToPartition.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + RecyclePartitionInfo partitionInfo = entry.getValue(); + Partition partition = partitionInfo.getPartition(); + long partitionId = entry.getKey(); + + if (isExpire(partitionId, currentTimeMs)) { + partitionsToErase.add(partition); + iterator.remove(); + idToRecycleTime.remove(partitionId); + eraseNum++; + } } - } // end for partitions - - // 2. erase exceed number - if (keepNum < 0) { - return; + } finally { + lock.writeLock().unlock(); } - com.google.common.collect.Table> dbTblId2PartitionNames = HashBasedTable.create(); - for (RecyclePartitionInfo partitionInfo : idToPartition.values()) { - Set partitionNames = dbTblId2PartitionNames.get(partitionInfo.dbId, partitionInfo.tableId); - if (partitionNames == null) { - partitionNames = Sets.newHashSet(); - dbTblId2PartitionNames.put(partitionInfo.dbId, partitionInfo.tableId, partitionNames); - } - partitionNames.add(partitionInfo.getPartition().getName()); + + // 2. Then erase partitions outside of lock + for (Partition partition : partitionsToErase) { + Env.getCurrentEnv().onErasePartition(partition); + // log + Env.getCurrentEnv().getEditLog().logErasePartition(partition.getId()); + LOG.info("erase partition[{}]. reason: expired", partition.getId()); } - for (Cell> cell : dbTblId2PartitionNames.cellSet()) { - for (String partitionName : cell.getValue()) { - erasePartitionWithSameName(cell.getRowKey(), cell.getColumnKey(), partitionName, currentTimeMs, - keepNum); + + // 3. Handle exceed number case + if (keepNum >= 0) { + lock.readLock().lock(); + try { + com.google.common.collect.Table> dbTblId2PartitionNames = + HashBasedTable.create(); + for (RecyclePartitionInfo partitionInfo : idToPartition.values()) { + Set partitionNames = + dbTblId2PartitionNames.get(partitionInfo.dbId, partitionInfo.tableId); + if (partitionNames == null) { + partitionNames = Sets.newHashSet(); + dbTblId2PartitionNames.put(partitionInfo.dbId, partitionInfo.tableId, partitionNames); + } + partitionNames.add(partitionInfo.getPartition().getName()); + } + + for (Cell> cell : dbTblId2PartitionNames.cellSet()) { + for (String partitionName : cell.getValue()) { + erasePartitionWithSameName(cell.getRowKey(), cell.getColumnKey(), + partitionName, currentTimeMs, + keepNum); + } + } + } finally { + lock.readLock().unlock(); } } } finally { @@ -523,26 +551,31 @@ private synchronized void erasePartition(long currentTimeMs, int keepNum) { } } - private synchronized List getSameNamePartitionIdListToErase(long dbId, long tableId, String partitionName, + private List getSameNamePartitionIdListToErase(long dbId, long tableId, String partitionName, int maxSameNameTrashNum) { - Iterator> iterator = idToPartition.entrySet().iterator(); List> partitionRecycleTimeLists = Lists.newArrayList(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - RecyclePartitionInfo partitionInfo = entry.getValue(); - if (partitionInfo.getDbId() != dbId || partitionInfo.getTableId() != tableId) { - continue; - } - Partition partition = partitionInfo.getPartition(); - if (partition.getName().equals(partitionName)) { - List partitionRecycleTimeInfo = Lists.newArrayList(); - partitionRecycleTimeInfo.add(entry.getKey()); - partitionRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey())); + lock.readLock().lock(); + try { + for (Map.Entry entry : idToPartition.entrySet()) { + RecyclePartitionInfo partitionInfo = entry.getValue(); + if (partitionInfo.getDbId() != dbId || partitionInfo.getTableId() != tableId) { + continue; + } - partitionRecycleTimeLists.add(partitionRecycleTimeInfo); + Partition partition = partitionInfo.getPartition(); + if (partition.getName().equals(partitionName)) { + List partitionRecycleTimeInfo = Lists.newArrayList(); + partitionRecycleTimeInfo.add(entry.getKey()); + partitionRecycleTimeInfo.add(idToRecycleTime.get(entry.getKey())); + + partitionRecycleTimeLists.add(partitionRecycleTimeInfo); + } } + } finally { + lock.readLock().unlock(); } + List partitionIdToErase = Lists.newArrayList(); if (partitionRecycleTimeLists.size() <= maxSameNameTrashNum) { return partitionIdToErase; @@ -559,20 +592,34 @@ private synchronized List getSameNamePartitionIdListToErase(long dbId, lon private synchronized void erasePartitionWithSameName(long dbId, long tableId, String partitionName, long currentTimeMs, int maxSameNameTrashNum) { - List partitionIdToErase = getSameNamePartitionIdListToErase(dbId, tableId, partitionName, - maxSameNameTrashNum); - for (Long partitionId : partitionIdToErase) { - RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId); - if (!isExpireMinLatency(partitionId, currentTimeMs)) { - continue; + List partitionIdToErase; + List partitionsToErase = new ArrayList<>(); + + // First get partitions to erase under write lock + lock.writeLock().lock(); + try { + partitionIdToErase = getSameNamePartitionIdListToErase(dbId, tableId, partitionName, maxSameNameTrashNum); + for (Long partitionId : partitionIdToErase) { + RecyclePartitionInfo partitionInfo = idToPartition.get(partitionId); + if (!isExpireMinLatency(partitionId, currentTimeMs)) { + continue; + } + Partition partition = partitionInfo.getPartition(); + partitionsToErase.add(partition); + + idToPartition.remove(partitionId); + idToRecycleTime.remove(partitionId); } - Partition partition = partitionInfo.getPartition(); + } finally { + lock.writeLock().unlock(); + } + // Then erase partitions outside of lock + for (Partition partition : partitionsToErase) { Env.getCurrentEnv().onErasePartition(partition); - idToPartition.remove(partitionId); - idToRecycleTime.remove(partitionId); - Env.getCurrentEnv().getEditLog().logErasePartition(partitionId); - LOG.info("erase partition[{}] name: {} from table[{}] from db[{}]", partitionId, partitionName, tableId, + Env.getCurrentEnv().getEditLog().logErasePartition(partition.getId()); + LOG.info("erase partition[{}] name: {} from table[{}] from db[{}]", + partition.getId(), partitionName, tableId, dbId); } }