Skip to content

Commit

Permalink
remove() refactored as suggested on jbarrett's review
Browse files Browse the repository at this point in the history
  • Loading branch information
albertogpz committed Dec 14, 2021
1 parent 3de17bb commit d6d80b2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,10 @@ public synchronized void remove() throws CacheException {
if (peekedIds.isEmpty()) {
return;
}
boolean wasEmpty = lastDispatchedKey == lastDestroyedKey;
Long key = peekedIds.remove();
updateHeadKey(key);
lastDispatchedKey = key;
final boolean wasEmpty = lastDispatchedKey == lastDestroyedKey;
final Long key = peekedIds.remove();

preProcessRemovedKey(key);

removeIndex(key);
// Remove the entry at that key with a callback arg signifying it is
Expand All @@ -313,6 +313,8 @@ public synchronized void remove() throws CacheException {
}
}

postProcessRemovedKey();

if (wasEmpty) {
synchronized (this) {
notifyAll();
Expand All @@ -326,6 +328,14 @@ public synchronized void remove() throws CacheException {
}
}

protected void preProcessRemovedKey(final Long key) {
updateHeadKey(key);
lastDispatchedKey = key;
}

protected void postProcessRemovedKey() {}


/**
* This method removes batchSize entries from the queue. It will only remove entries that were
* previously peeked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import java.util.function.Predicate;

import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
Expand Down Expand Up @@ -198,37 +196,17 @@ protected List<KeyAndEventPair> getElementsMatching(Predicate<GatewaySenderEvent
}

@Override
public synchronized void remove() throws CacheException {
if (peekedIds.isEmpty()) {
return;
}
boolean wasEmpty = lastDispatchedKey == lastDestroyedKey;
Long key = peekedIds.remove();
protected void preProcessRemovedKey(Long key) {
boolean isExtraPeekedId = extraPeekedIds.contains(key);
if (!isExtraPeekedId) {
updateHeadKey(key);
lastDispatchedKey = key;
super.preProcessRemovedKey(key);
} else {
extraPeekedIdsRemovedButPreviousIdNotRemoved.add(key);
}
removeIndex(key);
// Remove the entry at that key with a callback arg signifying it is
// a WAN queue so that AbstractRegionEntry.destroy can get the value
// even if it has been evicted to disk. In the normal case, the
// AbstractRegionEntry.destroy only gets the value in the VM.
try {
this.region.localDestroy(key, WAN_QUEUE_TOKEN);
this.stats.decQueueSize();
} catch (EntryNotFoundException ok) {
// this is acceptable because the conflation can remove entries
// out from underneath us.
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.",
this, key);
}
}
}

@Override
protected void postProcessRemovedKey() {
// For those extraPeekedIds removed that are consecutive to lastDispatchedKey:
// - Update lastDispatchedKey with them so that they are removed
// by the batch removal thread.
Expand All @@ -238,20 +216,7 @@ public synchronized void remove() throws CacheException {
while (extraPeekedIdsRemovedButPreviousIdNotRemoved.contains(tmpKey = inc(tmpKey))) {
extraPeekedIdsRemovedButPreviousIdNotRemoved.remove(tmpKey);
extraPeekedIds.remove(tmpKey);
updateHeadKey(tmpKey);
lastDispatchedKey = tmpKey;
}

if (wasEmpty) {
synchronized (this) {
notifyAll();
}
}

if (logger.isDebugEnabled()) {
logger.debug(
"{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}",
this, key, this.lastDispatchedKey, this.lastDestroyedKey);
super.preProcessRemovedKey(tmpKey);
}
}

Expand Down

0 comments on commit d6d80b2

Please sign in to comment.