Skip to content

Commit

Permalink
ARTEMIS-5338 AckManager flow control
Browse files Browse the repository at this point in the history
Say there are too many records in the JournalHashMap. In that case
the sender should pace sending more data. The target should limit
credits when the configured max pending records is reached.
  • Loading branch information
clebertsuconic committed Mar 6, 2025
1 parent 2aaf256 commit fb2e0d9
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public static String getMirrorAddress(String connectionName) {

private boolean directDeliver = true;

private int mirrorMaxPendingAcks = 10_000;

private final AMQPRoutingHandler routingHandler;

/*
Expand Down Expand Up @@ -164,6 +166,16 @@ public int getAmqpMinLargeMessageSize() {
return amqpMinLargeMessageSize;
}

public int getMirrorMaxPendingAcks() {
return mirrorMaxPendingAcks;
}

public ProtonProtocolManager setMirrorMaxPendingAcks(int maxPendingAcks) {
new Exception("setMaxPending " + maxPendingAcks).printStackTrace();
this.mirrorMaxPendingAcks = maxPendingAcks;
return this;
}

public ProtonProtocolManager setAmqpMinLargeMessageSize(int amqpMinLargeMessageSize) {
this.amqpMinLargeMessageSize = amqpMinLargeMessageSize;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,27 @@ public static MirrorController getControllerInUse() {
return CONTROLLER_THREAD_LOCAL.get();
}


@Override
public boolean isBusy() {
return getAckManager().size() > getMirrorMaxPendingAcks();
}

public int getMirrorMaxPendingAcks() {
try {
return connection.getProtocolManager().getMirrorMaxPendingAcks();
} catch (Throwable e) {
// It shouldn't happen, but if it did we just log it
logger.warn(e.getMessage(), e);
return 0;
}
}

public void verifyCredits() {
connection.runNow(creditTopUpRunner);
}


/**
* Objects of this class can be used by either transaction or by OperationContext. It is important that when you're
* using the transactions you clear any references to the operation context. Don't use transaction and
Expand Down Expand Up @@ -473,14 +494,17 @@ private void performAck(String nodeID,
logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", nodeID, messageID, targetQueue.getName());
}

getAckManager().ack(nodeID, targetQueue, messageID, reason, true);

OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, OperationConsistencyLevel.FULL);
}

private AckManager getAckManager() {
if (ackManager == null) {
ackManager = AckManagerProvider.getManager(server);
ackManager.registerMirror(this);
}

ackManager.ack(nodeID, targetQueue, messageID, reason, true);

OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, OperationConsistencyLevel.FULL);
return ackManager;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.LongSupplier;

import io.netty.util.collection.LongObjectHashMap;
Expand Down Expand Up @@ -78,6 +79,13 @@ public class AckManager implements ActiveMQComponent {
volatile MultiStepProgress progress;
ActiveMQScheduledComponent scheduledComponent;

private volatile int size;
private static final AtomicIntegerFieldUpdater<AckManager> sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(AckManager.class, "size");

public int size() {
return sizeUpdater.get(this);
}

public AckManager(ActiveMQServer server) {
assert server != null && server.getConfiguration() != null;
this.server = server;
Expand All @@ -92,6 +100,7 @@ public AckManager(ActiveMQServer server) {

public void reload(RecordInfo recordInfo) {
journalHashMapProvider.reload(recordInfo);
sizeUpdater.incrementAndGet(this);
}

@Override
Expand All @@ -104,6 +113,13 @@ public synchronized void stop() {
logger.debug("Stopping ackmanager on server {}", server);
}

public synchronized void pause() {
if (scheduledComponent != null) {
scheduledComponent.stop();
scheduledComponent = null;
}
}

@Override
public synchronized boolean isStarted() {
return scheduledComponent != null && scheduledComponent.isStarted();
Expand Down Expand Up @@ -188,6 +204,11 @@ private void flushMirrorTargets() {
targetCopy.forEach(AMQPMirrorControllerTarget::flush);
}

private void checkFlowControlMirrorTargets() {
List<AMQPMirrorControllerTarget> targetCopy = copyTargets();
targetCopy.forEach(AMQPMirrorControllerTarget::verifyCredits);
}

private synchronized List<AMQPMirrorControllerTarget> copyTargets() {
return new ArrayList<>(mirrorControllerTargets);
}
Expand Down Expand Up @@ -262,11 +283,14 @@ public void retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<
page.usageDown();
}
}
logger.info("Checking flow control again... size={}", size());
validateExpiredSet(address, acksToRetry);
} else {
logger.trace("Page Scan not required for address {}", address);
}

checkFlowControlMirrorTargets();

} catch (Throwable e) {
logger.warn(e.getMessage(), e);
} finally {
Expand Down Expand Up @@ -299,7 +323,10 @@ private void validateExpireSet(SimpleString address, long queueID, JournalHashMa
if (logger.isDebugEnabled()) {
logger.debug("Retried {} {} times, giving up on the entry now. Configured Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
retries.remove(retry);
if (retries.remove(retry) != null) {
sizeUpdater.decrementAndGet(AckManager.this);
logger.info("removed, size {}", size);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Retry {} attempted {} times on paging, Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
Expand Down Expand Up @@ -353,7 +380,10 @@ private void retryPage(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queu
}
}
}
retries.remove(ackRetry, transaction.getID());
if (retries.remove(ackRetry, transaction.getID()) != null) {
sizeUpdater.decrementAndGet(AckManager.this);
logger.info("Removing entry size::{}", size);
}
transaction.setContainsPersistent();
logger.trace("retry performed ok, ackRetry={} for message={} on queue", ackRetry, pagedMessage);
}
Expand Down Expand Up @@ -389,6 +419,8 @@ private boolean checkRetriesAndPaging(LongObjectHashMap<JournalHashMap<AckRetry,
if (ack(retry.getNodeID(), queue, retry.getMessageID(), retry.getReason(), false)) {
logger.trace("Removing retry {} as the retry went ok", retry);
queueRetries.remove(retry);
sizeUpdater.decrementAndGet(this);
logger.info("removing {}", size);
} else {
int retried = retry.attemptedQueue();
if (logger.isTraceEnabled()) {
Expand All @@ -410,6 +442,8 @@ public synchronized void addRetry(String nodeID, Queue queue, long messageID, Ac
}
AckRetry retry = new AckRetry(nodeID, messageID, reason);
journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry);
sizeUpdater.incrementAndGet(this);
logger.info("Adding a retry {}", size);
if (scheduledComponent != null) {
// we set the retry delay again in case it was changed.
scheduledComponent.setPeriod(configuration.getMirrorAckManagerRetryDelay());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public static int getSize() {
}

public static AckManager getManager(ActiveMQServer server) {
if (server == null) {
throw new NullPointerException("server is null");
}
synchronized (managerHashMap) {
AckManager ackManager = managerHashMap.get(server);
if (ackManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
Expand Down Expand Up @@ -175,6 +176,10 @@ public boolean isStarted() {
return state == ReceiverState.STARTED;
}

public boolean isBusy() {
return false;
}

public boolean isStopping() {
return state == ReceiverState.STOPPING;
}
Expand Down Expand Up @@ -276,16 +281,21 @@ public void run() {
if (connection.isHandler()) {
connection.requireInHandler();

if (context.isStarted()) {
if (context.isStarted() && !context.isBusy()) {
final int pending = context.pendingSettles;

if (isBellowThreshold(receiver.getCredit(), pending, threshold)) {
int topUp = calculatedUpdateRefill(refill, receiver.getCredit(), pending);
if (topUp > 0) {
if (context instanceof AMQPMirrorControllerTarget) {
logger.info("stuff is flowing {}", topUp);
}
receiver.flow(topUp);
connection.instantFlush();
}
}
} else {
logger.info("WOW, context is busy!!!");
}
} else {
// This must run on the connection thread as it interacts with proton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public static void assertTrue(String failureMessage, Condition condition, final
assertTrue(failureMessage, condition, duration, SLEEP_MILLIS);
}

public static <T> T assertNotNull(Supplier<T> supplier, final long duration, final long sleep) throws Exception {
Assertions.assertTrue(waitFor(() -> supplier.get() != null, duration, sleep));
return supplier.get();
}

public static void assertTrue(Condition condition, final long duration, final long sleep) {
assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep);
}
Expand Down
Loading

0 comments on commit fb2e0d9

Please sign in to comment.