From d6d80b29c7d7d21826a39d76a1f61867c53b37d7 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Mon, 13 Dec 2021 12:36:50 +0100 Subject: [PATCH] remove() refactored as suggested on jbarrett's review --- .../wan/serial/SerialGatewaySenderQueue.java | 18 +++++-- .../TxGroupingSerialGatewaySenderQueue.java | 47 +++---------------- 2 files changed, 20 insertions(+), 45 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java index a0b2fc22b3cf..e898121d001c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java @@ -290,10 +290,10 @@ public synchronized void remove() throws CacheException { if (peekedIds.isEmpty()) { return; } - boolean wasEmpty = lastDispatchedKey == lastDestroyedKey; - Long key = peekedIds.remove(); - updateHeadKey(key); - lastDispatchedKey = key; + final boolean wasEmpty = lastDispatchedKey == lastDestroyedKey; + final Long key = peekedIds.remove(); + + preProcessRemovedKey(key); removeIndex(key); // Remove the entry at that key with a callback arg signifying it is @@ -313,6 +313,8 @@ public synchronized void remove() throws CacheException { } } + postProcessRemovedKey(); + if (wasEmpty) { synchronized (this) { notifyAll(); @@ -326,6 +328,14 @@ public synchronized void remove() throws CacheException { } } + protected void preProcessRemovedKey(final Long key) { + updateHeadKey(key); + lastDispatchedKey = key; + } + + protected void postProcessRemovedKey() {} + + /** * This method removes batchSize entries from the queue. It will only remove entries that were * previously peeked. diff --git a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingSerialGatewaySenderQueue.java b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingSerialGatewaySenderQueue.java index bbca1c67b35f..4adc7f03786a 100644 --- a/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingSerialGatewaySenderQueue.java +++ b/geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingSerialGatewaySenderQueue.java @@ -27,9 +27,7 @@ import java.util.function.Predicate; import org.apache.geode.annotations.VisibleForTesting; -import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheListener; -import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.TransactionId; import org.apache.geode.cache.asyncqueue.AsyncEvent; import org.apache.geode.internal.cache.wan.AbstractGatewaySender; @@ -198,37 +196,17 @@ protected List getElementsMatching(Predicate