Skip to content

Commit

Permalink
HDDS-11201. Optimise FullTableCache eviction, scheduler and lock. (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
sumitagrawl authored Aug 2, 2024
1 parent 5118f23 commit d38372a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@

package org.apache.hadoop.hdds.utils.db.cache;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -54,7 +58,8 @@ public class FullTableCache<KEY, VALUE> implements TableCache<KEY, VALUE> {

private final Map<CacheKey<KEY>, CacheValue<VALUE>> cache;
private final NavigableMap<Long, Set<CacheKey<KEY>>> epochEntries;
private final ExecutorService executorService;
private final ScheduledExecutorService executorService;
private final Queue<Long> epochCleanupQueue = new ConcurrentLinkedQueue<>();

private final ReadWriteLock lock;

Expand Down Expand Up @@ -82,8 +87,8 @@ public FullTableCache(String threadNamePrefix) {
.setDaemon(true)
.setNameFormat(threadNamePrefix + "FullTableCache-Cleanup-%d")
.build();
executorService = Executors.newSingleThreadExecutor(threadFactory);

executorService = Executors.newScheduledThreadPool(1, threadFactory);
executorService.scheduleWithFixedDelay(() -> cleanupTask(), 0, 1000L, TimeUnit.MILLISECONDS);
statsRecorder = new CacheStatsRecorder();
}

Expand Down Expand Up @@ -111,18 +116,31 @@ public void loadInitial(CacheKey<KEY> key, CacheValue<VALUE> value) {
@Override
public void put(CacheKey<KEY> cacheKey, CacheValue<VALUE> value) {
try {
lock.writeLock().lock();
lock.readLock().lock();
cache.put(cacheKey, value);
epochEntries.computeIfAbsent(value.getEpoch(),
v -> new CopyOnWriteArraySet<>()).add(cacheKey);
// add in case of null value for cleanup purpose only when key is deleted
if (value.getCacheValue() == null) {
epochEntries.computeIfAbsent(value.getEpoch(),
v -> new CopyOnWriteArraySet<>()).add(cacheKey);
}
} finally {
lock.writeLock().unlock();
lock.readLock().unlock();
}
}

@Override
public void cleanup(List<Long> epochs) {
executorService.execute(() -> evictCache(epochs));
epochCleanupQueue.clear();
epochCleanupQueue.addAll(epochs);
}

private void cleanupTask() {
if (epochCleanupQueue.isEmpty()) {
return;
}
ArrayList<Long> epochList = new ArrayList<>(epochCleanupQueue);
epochCleanupQueue.removeAll(epochList);
evictCache(epochList);
}

@Override
Expand All @@ -139,45 +157,48 @@ public Iterator<Map.Entry<CacheKey<KEY>, CacheValue<VALUE>>> iterator() {
@VisibleForTesting
@Override
public void evictCache(List<Long> epochs) {
// when no delete entries, can exit immediately
if (epochEntries.isEmpty()) {
return;
}

Set<CacheKey<KEY>> currentCacheKeys;
CacheKey<KEY> cachekey;
long lastEpoch = epochs.get(epochs.size() - 1);
for (long currentEpoch : epochEntries.keySet()) {
currentCacheKeys = epochEntries.get(currentEpoch);
// Acquire lock to avoid race between cleanup and add to cache entry by
// client requests.
try {
lock.writeLock().lock();
for (long currentEpoch : epochEntries.keySet()) {
currentCacheKeys = epochEntries.get(currentEpoch);

// If currentEntry epoch is greater than last epoch provided, we have
// deleted all entries less than specified epoch. So, we can break.
if (currentEpoch > lastEpoch) {
break;
}
// If currentEntry epoch is greater than last epoch provided, we have
// deleted all entries less than specified epoch. So, we can break.
if (currentEpoch > lastEpoch) {
break;
}

// Acquire lock to avoid race between cleanup and add to cache entry by
// client requests.
try {
lock.writeLock().lock();
if (epochs.contains(currentEpoch)) {
for (Iterator<CacheKey<KEY>> iterator = currentCacheKeys.iterator();
iterator.hasNext();) {
cachekey = iterator.next();
cache.computeIfPresent(cachekey, ((k, v) -> {
// If cache epoch entry matches with current Epoch, remove entry
// from cache.
if (v.getCacheValue() == null && v.getEpoch() == currentEpoch) {
if (LOG.isDebugEnabled()) {
LOG.debug("CacheKey {} with epoch {} is removed from cache",
k.getCacheKey(), currentEpoch);
}
return null;
for (Iterator<CacheKey<KEY>> iterator = currentCacheKeys.iterator();
iterator.hasNext();) {
cachekey = iterator.next();
cache.computeIfPresent(cachekey, ((k, v) -> {
// If cache epoch entry matches with current Epoch, remove entry
// from cache.
if (v.getCacheValue() == null && v.getEpoch() == currentEpoch) {
if (LOG.isDebugEnabled()) {
LOG.debug("CacheKey {} with epoch {} is removed from cache",
k.getCacheKey(), currentEpoch);
}
return v;
}));
}
return null;
}
return v;
}));
// Remove epoch entry, as the entry is there in epoch list.
epochEntries.remove(currentEpoch);
}
} finally {
lock.writeLock().unlock();
}
} finally {
lock.writeLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,12 @@ public void testTableCacheWithRenameKey(TableCache.CacheType cacheType) {
// Epoch entries should be like (long, (key1, key2, ...))
// (0, (0A, 0B)) (1, (1A, 1B)) (2, (2A, 1B))
assertEquals(3, tableCache.getEpochEntries().size());
assertEquals(2, tableCache.getEpochEntries().get(0L).size());
if (cacheType == TableCache.CacheType.FULL_CACHE) {
// first time cache value is null for xA cases and non-null value for xB cases, so have 1 entry
assertEquals(1, tableCache.getEpochEntries().get(0L).size());
} else {
assertEquals(2, tableCache.getEpochEntries().get(0L).size());
}

// Cache should be like (key, (cacheValue, long))
// (0A, (null, 0)) (0B, (0, 0))
Expand Down Expand Up @@ -227,9 +232,13 @@ public void testPartialTableCacheWithOverrideEntries(


assertEquals(3, tableCache.size());
// It will have 2 additional entries because we have 2 override entries.
assertEquals(3 + 2,
tableCache.getEpochEntries().size());
if (cacheType == TableCache.CacheType.FULL_CACHE) {
// full table cache keep only deleted entry which is 0
assertEquals(0, tableCache.getEpochEntries().size());
} else {
// It will have 2 additional entries because we have 2 override entries.
assertEquals(3 + 2, tableCache.getEpochEntries().size());
}

// Now remove

Expand Down Expand Up @@ -301,9 +310,13 @@ public void testPartialTableCacheWithOverrideAndDelete(


assertEquals(3, tableCache.size());
// It will have 4 additional entries because we have 4 override entries.
assertEquals(3 + 4,
tableCache.getEpochEntries().size());
if (cacheType == TableCache.CacheType.FULL_CACHE) {
// It will have 2 deleted entries
assertEquals(2, tableCache.getEpochEntries().size());
} else {
// It will have 4 additional entries because we have 4 override entries.
assertEquals(3 + 4, tableCache.getEpochEntries().size());
}

// Now remove

Expand Down Expand Up @@ -506,7 +519,12 @@ public void testTableCacheWithNonConsecutiveEpochList(
tableCache.evictCache(epochs);

assertEquals(2, tableCache.size());
assertEquals(2, tableCache.getEpochEntries().size());
if (cacheType == TableCache.CacheType.FULL_CACHE) {
// no deleted entries
assertEquals(0, tableCache.getEpochEntries().size());
} else {
assertEquals(2, tableCache.getEpochEntries().size());
}

assertNotNull(tableCache.get(new CacheKey<>(Long.toString(0))));
assertEquals(2,
Expand Down

0 comments on commit d38372a

Please sign in to comment.