Skip to content

Commit

Permalink
ARTEMIS-5342 Expose AckManager pending records on management
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Mar 7, 2025
1 parent 1a462dd commit 65691bb
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2866,4 +2866,13 @@ static void exportConfigAsProperties(Object source) {

@LogMessage(id = 601798, value = "User {} is exporting configuration as properties on target resource: {}", level = LogMessage.Level.INFO)
void exportConfigAsProperties(String user, Object source);

static void getPendingMirrorAcks(Object source) {
BASE_LOGGER.getPendingMirrorAcks(getCaller(), source);
}

@LogMessage(id = 601799, value = "User {} is getting PendingMirrorAcks on target resource: {}", level = LogMessage.Level.INFO)
void getPendingMirrorAcks(String user, Object source);


}
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,9 @@ void replay(@Parameter(name = "startScanDate", desc = "Start date where we will
@Attribute(desc = AUTHORIZATION_FAILURE_COUNT)
long getAuthorizationFailureCount();

@Attribute(desc = "Number of pending acknowledgements records on mirroring")
int getPendingMirrorAcks();

@Operation(desc = "Export the broker configuration as properties", impact = MBeanOperationInfo.ACTION)
void exportConfigAsProperties() throws Exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
Expand All @@ -79,11 +80,10 @@ public class AckManager implements ActiveMQComponent {
volatile MultiStepProgress progress;
ActiveMQScheduledComponent scheduledComponent;

private volatile int size;
private static final AtomicIntegerFieldUpdater<AckManager> sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(AckManager.class, "size");
final MirrorRegistry mirrorRegistry;

public int size() {
return sizeUpdater.get(this);
return mirrorRegistry.getMirrorAckSize();
}

public AckManager(ActiveMQServer server) {
Expand All @@ -92,15 +92,15 @@ public AckManager(ActiveMQServer server) {
this.configuration = server.getConfiguration();
this.ioCriticalErrorListener = server.getIoCriticalErrorListener();
this.sequenceGenerator = server.getStorageManager()::generateID;

this.mirrorRegistry = server.getMirrorRegistry();
// The JournalHashMap has to use the storage manager to guarantee we are using the Replicated Journal Wrapper in case this is a replicated journal
journalHashMapProvider = new JournalHashMapProvider<>(sequenceGenerator, server.getStorageManager(), AckRetry.getPersister(), JournalRecordIds.ACK_RETRY, OperationContextImpl::getContext, server.getPostOffice()::findQueue, server.getIoCriticalErrorListener());
this.referenceIDSupplier = new ReferenceIDSupplier(server);
}

public void reload(RecordInfo recordInfo) {
journalHashMapProvider.reload(recordInfo);
sizeUpdater.incrementAndGet(this);
mirrorRegistry.incrementMirrorAckSize();
}

@Override
Expand Down Expand Up @@ -323,7 +323,7 @@ private void validateExpireSet(SimpleString address, long queueID, JournalHashMa
logger.debug("Retried {} {} times, giving up on the entry now. Configured Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
if (retries.remove(retry) != null) {
sizeUpdater.decrementAndGet(AckManager.this);
mirrorRegistry.decrementMirrorAckSize();
}
} else {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -379,7 +379,7 @@ private void retryPage(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queu
}
}
if (retries.remove(ackRetry, transaction.getID()) != null) {
sizeUpdater.decrementAndGet(AckManager.this);
mirrorRegistry.decrementMirrorAckSize();
}
transaction.setContainsPersistent();
logger.trace("retry performed ok, ackRetry={} for message={} on queue", ackRetry, pagedMessage);
Expand Down Expand Up @@ -416,7 +416,7 @@ private boolean checkRetriesAndPaging(LongObjectHashMap<JournalHashMap<AckRetry,
if (ack(retry.getNodeID(), queue, retry.getMessageID(), retry.getReason(), false)) {
logger.trace("Removing retry {} as the retry went ok", retry);
queueRetries.remove(retry);
sizeUpdater.decrementAndGet(this);
mirrorRegistry.decrementMirrorAckSize();
} else {
int retried = retry.attemptedQueue();
if (logger.isTraceEnabled()) {
Expand All @@ -438,7 +438,7 @@ public synchronized void addRetry(String nodeID, Queue queue, long messageID, Ac
}
AckRetry retry = new AckRetry(nodeID, messageID, reason);
journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry);
sizeUpdater.incrementAndGet(this);
mirrorRegistry.incrementMirrorAckSize();
if (scheduledComponent != null) {
// we set the retry delay again in case it was changed.
scheduledComponent.setPeriod(configuration.getMirrorAckManagerRetryDelay());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4747,6 +4747,14 @@ public long getAuthorizationFailureCount() {
return server.getSecurityStore().getAuthorizationFailureCount();
}

@Override
public int getPendingMirrorAcks() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getPendingMirrorAcks(this.server);
}
return server.getPendingMirrorAcks();
}

@Override
public void exportConfigAsProperties() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
Expand Down Expand Up @@ -1025,4 +1026,8 @@ default String getStatus() {
}

void registerRecordsLoader(Consumer<RecordInfo> recordsLoader);

MirrorRegistry getMirrorRegistry();

int getPendingMirrorAcks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.server.mirror.MirrorRegistry;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
Expand Down Expand Up @@ -259,6 +260,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {

private HAPolicy haPolicy;

private MirrorRegistry mirrorRegistry = new MirrorRegistry();

// This will be useful on tests or embedded
private boolean rebuildCounters = true;

Expand Down Expand Up @@ -4821,4 +4824,14 @@ public AutoCloseable managementLock() throws Exception {
public IOCriticalErrorListener getIoCriticalErrorListener() {
return ioCriticalErrorListener;
}
}

@Override
public MirrorRegistry getMirrorRegistry() {
return mirrorRegistry;
}

@Override
public int getPendingMirrorAcks() {
return mirrorRegistry.getMirrorAckSize();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.core.server.mirror;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class MirrorRegistry {

private volatile int mirrorAckSize;
private static final AtomicIntegerFieldUpdater<MirrorRegistry> sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(MirrorRegistry.class, "mirrorAckSize");

public int getMirrorAckSize() {
return sizeUpdater.get(this);
}

public void incrementMirrorAckSize() {
sizeUpdater.incrementAndGet(this);
}

public void decrementMirrorAckSize() {
sizeUpdater.decrementAndGet(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,18 @@ public void testGetAttributes() throws Exception {
assertTrue(serverControl.isActive());
}

@TestTemplate
public void testPendingMirrorAcks() throws Exception {
ActiveMQServerControl serverControl = createManagementControl();
// faking some data, to make sure we are not just returning a default value
for (int i = 0; i < 7; i++) {
server.getMirrorRegistry().incrementMirrorAckSize();
}

assertEquals(7, serverControl.getPendingMirrorAcks());
assertEquals(7, server.getPendingMirrorAcks());
}

@TestTemplate
public void testBrokerPluginClassNames() throws Exception {
ActiveMQServerControl serverControl = createManagementControl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.Attribute;
import org.apache.activemq.artemis.api.core.management.Parameter;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
Expand Down Expand Up @@ -1857,6 +1858,12 @@ public long getAuthorizationFailureCount() {
return (long) proxy.retrieveAttributeValue("authorizationFailureCount");
}


@Attribute(desc = "Number of pending acknowledgements records on mirroring")
public int getPendingMirrorAcks() {
return (int) proxy.retrieveAttributeValue("pendingMirrorAcks");
}

@Override
public void exportConfigAsProperties() throws Exception {
proxy.invokeOperation("exportConfigAsProperties");
Expand Down

0 comments on commit 65691bb

Please sign in to comment.