Skip to content

Commit

Permalink
ARTEMIS-5333 add config for global-max-size-percent-of-jvm-max-memory…
Browse files Browse the repository at this point in the history
… to simplify setting the global memory limit
  • Loading branch information
gtully authored and clebertsuconic committed Mar 6, 2025
1 parent 091c439 commit 710eda6
Show file tree
Hide file tree
Showing 20 changed files with 298 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ public static String getDefaultHapolicyBackupStrategy() {

public static final boolean DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING = false;

public static final long DEFAULT_GLOBAL_MAX_SIZE = Runtime.getRuntime().maxMemory() / 2;
public static final int DEFAULT_GLOBAL_MAX_MEMORY_PERCENT = 50;

public static final long DEFAULT_GLOBAL_MAX_MESSAGES = -1;

Expand Down Expand Up @@ -1589,11 +1589,8 @@ public static boolean getDefaultAmqpUseCoreSubscriptionNaming() {
return DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING;
}

/**
* The default global max size. -1 = no global max size.
*/
public static long getDefaultMaxGlobalSize() {
return DEFAULT_GLOBAL_MAX_SIZE;
public static long getDefaultMaxGlobalSizeAsPercentOfJvmMaxMemory(int percentOfJvmMaxMemory) {
return (long) (Runtime.getRuntime().maxMemory() * (percentOfJvmMaxMemory / 100.0f));
}

public static long getDefaultMaxGlobalMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public static String getMirrorAddress(String connectionName) {

private boolean directDeliver = true;

private int mirrorMaxPendingAcks = 10_000;

private final AMQPRoutingHandler routingHandler;

/*
Expand Down Expand Up @@ -164,6 +166,15 @@ public int getAmqpMinLargeMessageSize() {
return amqpMinLargeMessageSize;
}

public int getMirrorMaxPendingAcks() {
return mirrorMaxPendingAcks;
}

public ProtonProtocolManager setMirrorMaxPendingAcks(int maxPendingAcks) {
this.mirrorMaxPendingAcks = maxPendingAcks;
return this;
}

public ProtonProtocolManager setAmqpMinLargeMessageSize(int amqpMinLargeMessageSize) {
this.amqpMinLargeMessageSize = amqpMinLargeMessageSize;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,27 @@ public static MirrorController getControllerInUse() {
return CONTROLLER_THREAD_LOCAL.get();
}


@Override
public boolean isBusy() {
return getAckManager().size() > getMirrorMaxPendingAcks();
}

public int getMirrorMaxPendingAcks() {
try {
return connection.getProtocolManager().getMirrorMaxPendingAcks();
} catch (Throwable e) {
// It shouldn't happen, but if it did we just log it
logger.warn(e.getMessage(), e);
return 0;
}
}

public void verifyCredits() {
connection.runNow(creditTopUpRunner);
}


/**
* Objects of this class can be used by either transaction or by OperationContext. It is important that when you're
* using the transactions you clear any references to the operation context. Don't use transaction and
Expand Down Expand Up @@ -473,14 +494,17 @@ private void performAck(String nodeID,
logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", nodeID, messageID, targetQueue.getName());
}

getAckManager().ack(nodeID, targetQueue, messageID, reason, true);

OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, OperationConsistencyLevel.FULL);
}

private AckManager getAckManager() {
if (ackManager == null) {
ackManager = AckManagerProvider.getManager(server);
ackManager.registerMirror(this);
}

ackManager.ack(nodeID, targetQueue, messageID, reason, true);

OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, OperationConsistencyLevel.FULL);
return ackManager;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.LongSupplier;

import io.netty.util.collection.LongObjectHashMap;
Expand Down Expand Up @@ -78,6 +79,13 @@ public class AckManager implements ActiveMQComponent {
volatile MultiStepProgress progress;
ActiveMQScheduledComponent scheduledComponent;

private volatile int size;
private static final AtomicIntegerFieldUpdater<AckManager> sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(AckManager.class, "size");

public int size() {
return sizeUpdater.get(this);
}

public AckManager(ActiveMQServer server) {
assert server != null && server.getConfiguration() != null;
this.server = server;
Expand All @@ -92,6 +100,7 @@ public AckManager(ActiveMQServer server) {

public void reload(RecordInfo recordInfo) {
journalHashMapProvider.reload(recordInfo);
sizeUpdater.incrementAndGet(this);
}

@Override
Expand All @@ -104,6 +113,13 @@ public synchronized void stop() {
logger.debug("Stopping ackmanager on server {}", server);
}

public synchronized void pause() {
if (scheduledComponent != null) {
scheduledComponent.stop();
scheduledComponent = null;
}
}

@Override
public synchronized boolean isStarted() {
return scheduledComponent != null && scheduledComponent.isStarted();
Expand Down Expand Up @@ -188,6 +204,11 @@ private void flushMirrorTargets() {
targetCopy.forEach(AMQPMirrorControllerTarget::flush);
}

private void checkFlowControlMirrorTargets() {
List<AMQPMirrorControllerTarget> targetCopy = copyTargets();
targetCopy.forEach(AMQPMirrorControllerTarget::verifyCredits);
}

private synchronized List<AMQPMirrorControllerTarget> copyTargets() {
return new ArrayList<>(mirrorControllerTargets);
}
Expand Down Expand Up @@ -267,6 +288,8 @@ public void retryAddress(SimpleString address, LongObjectHashMap<JournalHashMap<
logger.trace("Page Scan not required for address {}", address);
}

checkFlowControlMirrorTargets();

} catch (Throwable e) {
logger.warn(e.getMessage(), e);
} finally {
Expand Down Expand Up @@ -299,7 +322,9 @@ private void validateExpireSet(SimpleString address, long queueID, JournalHashMa
if (logger.isDebugEnabled()) {
logger.debug("Retried {} {} times, giving up on the entry now. Configured Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
}
retries.remove(retry);
if (retries.remove(retry) != null) {
sizeUpdater.decrementAndGet(AckManager.this);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Retry {} attempted {} times on paging, Configuration Page Attempts={}", retry, retry.getPageAttempts(), configuration.getMirrorAckManagerPageAttempts());
Expand Down Expand Up @@ -353,7 +378,9 @@ private void retryPage(LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queu
}
}
}
retries.remove(ackRetry, transaction.getID());
if (retries.remove(ackRetry, transaction.getID()) != null) {
sizeUpdater.decrementAndGet(AckManager.this);
}
transaction.setContainsPersistent();
logger.trace("retry performed ok, ackRetry={} for message={} on queue", ackRetry, pagedMessage);
}
Expand Down Expand Up @@ -389,6 +416,7 @@ private boolean checkRetriesAndPaging(LongObjectHashMap<JournalHashMap<AckRetry,
if (ack(retry.getNodeID(), queue, retry.getMessageID(), retry.getReason(), false)) {
logger.trace("Removing retry {} as the retry went ok", retry);
queueRetries.remove(retry);
sizeUpdater.decrementAndGet(this);
} else {
int retried = retry.attemptedQueue();
if (logger.isTraceEnabled()) {
Expand All @@ -410,6 +438,7 @@ public synchronized void addRetry(String nodeID, Queue queue, long messageID, Ac
}
AckRetry retry = new AckRetry(nodeID, messageID, reason);
journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry);
sizeUpdater.incrementAndGet(this);
if (scheduledComponent != null) {
// we set the retry delay again in case it was changed.
scheduledComponent.setPeriod(configuration.getMirrorAckManagerRetryDelay());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public static int getSize() {
}

public static AckManager getManager(ActiveMQServer server) {
if (server == null) {
throw new NullPointerException("server is null");
}
synchronized (managerHashMap) {
AckManager ackManager = managerHashMap.get(server);
if (ackManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public boolean isStarted() {
return state == ReceiverState.STARTED;
}

public boolean isBusy() {
return false;
}

public boolean isStopping() {
return state == ReceiverState.STOPPING;
}
Expand Down Expand Up @@ -276,7 +280,7 @@ public void run() {
if (connection.isHandler()) {
connection.requireInHandler();

if (context.isStarted()) {
if (context.isStarted() && !context.isBusy()) {
final int pending = context.pendingSettles;

if (isBellowThreshold(receiver.getCredit(), pending, threshold)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,10 @@ default boolean isJDBC() {

Configuration setConfigurationFileRefreshPeriod(long configurationFileRefreshPeriod);

int getGlobalMaxSizePercentOfJvmMaxMemory();

ConfigurationImpl setGlobalMaxSizePercentOfJvmMaxMemory(int percentOfJvmMaxMemory);

long getGlobalMaxSize();

Configuration setGlobalMaxSize(long globalMaxSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ public class ConfigurationImpl implements Configuration, Serializable {

private boolean mirrorPageTransaction = ActiveMQDefaultConfiguration.getMirrorPageTransaction();

private int globalMaxSizePercentOfJvmMaxMemory = ActiveMQDefaultConfiguration.DEFAULT_GLOBAL_MAX_MEMORY_PERCENT;

/**
* Parent folder for all data folders.
Expand Down Expand Up @@ -1258,14 +1259,24 @@ public ConfigurationImpl setGlobalMaxSize(long maxSize) {
@Override
public long getGlobalMaxSize() {
if (globalMaxSize == null) {
this.globalMaxSize = ActiveMQDefaultConfiguration.getDefaultMaxGlobalSize();
this.globalMaxSize = ActiveMQDefaultConfiguration.getDefaultMaxGlobalSizeAsPercentOfJvmMaxMemory(getGlobalMaxSizePercentOfJvmMaxMemory());
if (!Env.isTestEnv()) {
ActiveMQServerLogger.LOGGER.usingDefaultPaging(globalMaxSize);
}
}
return globalMaxSize;
}

@Override
public int getGlobalMaxSizePercentOfJvmMaxMemory() {
return globalMaxSizePercentOfJvmMaxMemory;
}

@Override
public ConfigurationImpl setGlobalMaxSizePercentOfJvmMaxMemory(int percentOfJvmMaxMemory) {
this.globalMaxSizePercentOfJvmMaxMemory = percentOfJvmMaxMemory;
return this;
}

@Override
public ConfigurationImpl setGlobalMaxMessages(long maxMessages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {

private static final String GLOBAL_MAX_SIZE = "global-max-size";

private static final String GLOBAL_MAX_SIZE_PERCENT_JVM_MAX_MEM = "global-max-size-percent-of-jvm-max-memory";

private static final String GLOBAL_MAX_MESSAGES = "global-max-messages";

public static final String MAX_DISK_USAGE = "max-disk-usage";
Expand Down Expand Up @@ -505,6 +507,8 @@ public void parseMainConfig(final Element e, final Configuration config) throws

config.setMqttSessionStatePersistenceTimeout(getLong(e, "mqtt-session-state-persistence-timeout", config.getMqttSessionStatePersistenceTimeout(), GT_ZERO));

config.setGlobalMaxSizePercentOfJvmMaxMemory(getInteger(e, GLOBAL_MAX_SIZE_PERCENT_JVM_MAX_MEM, config.getGlobalMaxSizePercentOfJvmMaxMemory(), GT_ZERO));

long globalMaxSize = getTextBytesAsLongBytes(e, GLOBAL_MAX_SIZE, -1, MINUS_ONE_OR_GT_ZERO);

if (globalMaxSize > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,15 @@
<xsd:annotation>
<xsd:documentation>
Number of messages before all addresses will enter into their Full Policy configured.
It works in conjunction with global-max-size, being watever value hits its maximum first.
It works in conjunction with global-max-size, being whatever value hits its maximum first.
</xsd:documentation>
</xsd:annotation>
</xsd:element>

<xsd:element name="global-max-size-percent-of-jvm-max-memory" type="xsd:string" default="50" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Percentage of JVM Runtime Max Memory that should be used by default for global-max-size.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2630,6 +2630,24 @@ public void testExtractPropertyClassName() throws Exception {
assertEquals("foo.class.bar", ConfigurationImpl.extractPropertyClassName("foo.class.bar.class"));
}

@Test
public void testGlobalMaxSizePercentOfJvmMaxMemory() throws Exception {

// existing default
long half = Runtime.getRuntime().maxMemory() / 2;
ConfigurationImpl configuration = new ConfigurationImpl();
assertEquals(half, configuration.getGlobalMaxSize());

configuration.setGlobalMaxSizePercentOfJvmMaxMemory(25);
assertEquals(half, configuration.getGlobalMaxSize());

// needs new instance
configuration = new ConfigurationImpl();
configuration.setGlobalMaxSizePercentOfJvmMaxMemory(25);
long quarter = Runtime.getRuntime().maxMemory() / 4;
assertEquals(quarter, configuration.getGlobalMaxSize());
}

public static class DummyConfig {
private int intProperty;
private int idCacheSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,7 @@ public void testMaxSize() throws Exception {
<core>
<global-max-size>10M</global-max-size>
<global-max-messages>1000</global-max-messages>
<global-max-size-percent-of-jvm-max-memory>30</global-max-size-percent-of-jvm-max-memory>
</core>
</configuration>""");

Expand All @@ -777,6 +778,7 @@ public void testMaxSize() throws Exception {

assertEquals(10 * 1024 * 1024, configuration.getGlobalMaxSize());
assertEquals(1000, configuration.getGlobalMaxMessages());
assertEquals(30, configuration.getGlobalMaxSizePercentOfJvmMaxMemory());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ private void validateFullConfig(Configuration configInstance) {
assertTrue(a2Role.isDeleteNonDurableQueue());
assertFalse(a2Role.isManage());
assertEquals(1234567, configInstance.getGlobalMaxSize());
assertEquals(30, configInstance.getGlobalMaxSizePercentOfJvmMaxMemory());
assertEquals(37, configInstance.getMaxDiskUsage());
assertEquals(123, configInstance.getDiskScanPeriod());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
<global-max-size>1234567</global-max-size>
<global-max-size-percent-of-jvm-max-memory>30</global-max-size-percent-of-jvm-max-memory>
<max-disk-usage>37</max-disk-usage>
<min-disk-free>500Mb</min-disk-free>
<disk-scan-period>123</disk-scan-period>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
<global-max-size>1234567</global-max-size>
<global-max-size-percent-of-jvm-max-memory>30</global-max-size-percent-of-jvm-max-memory>
<max-disk-usage>37</max-disk-usage>
<disk-scan-period>123</disk-scan-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<temporary-queue-namespace>TEMP</temporary-queue-namespace>
<global-max-size>1234567</global-max-size>
<global-max-size-percent-of-jvm-max-memory>30</global-max-size-percent-of-jvm-max-memory>
<max-disk-usage>37</max-disk-usage>
<disk-scan-period>123</disk-scan-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public static void assertTrue(String failureMessage, Condition condition, final
assertTrue(failureMessage, condition, duration, SLEEP_MILLIS);
}

public static <T> T assertNotNull(Supplier<T> supplier, final long duration, final long sleep) throws Exception {
Assertions.assertTrue(waitFor(() -> supplier.get() != null, duration, sleep));
return supplier.get();
}

public static void assertTrue(Condition condition, final long duration, final long sleep) {
assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep);
}
Expand Down
Loading

0 comments on commit 710eda6

Please sign in to comment.