diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 28769295137..9546ab2f953 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -501,7 +501,7 @@ public static String getDefaultHapolicyBackupStrategy() { public static final boolean DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING = false; - public static final long DEFAULT_GLOBAL_MAX_SIZE = Runtime.getRuntime().maxMemory() / 2; + public static final int DEFAULT_GLOBAL_MAX_MEMORY_PERCENT = 50; public static final long DEFAULT_GLOBAL_MAX_MESSAGES = -1; @@ -1589,11 +1589,8 @@ public static boolean getDefaultAmqpUseCoreSubscriptionNaming() { return DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING; } - /** - * The default global max size. -1 = no global max size. - */ - public static long getDefaultMaxGlobalSize() { - return DEFAULT_GLOBAL_MAX_SIZE; + public static long getDefaultMaxGlobalSizeAsPercentOfJvmMaxMemory(int percentOfJvmMaxMemory) { + return (long) (Runtime.getRuntime().maxMemory() * (percentOfJvmMaxMemory / 100.0f)); } public static long getDefaultMaxGlobalMessages() { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 52d767c2dfa..af3398f2430 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -112,6 +112,8 @@ public static String getMirrorAddress(String connectionName) { private boolean directDeliver = true; + private int mirrorMaxPendingAcks = 10_000; + private final AMQPRoutingHandler routingHandler; /* @@ -164,6 +166,15 @@ public int getAmqpMinLargeMessageSize() { return amqpMinLargeMessageSize; } + public int getMirrorMaxPendingAcks() { + return mirrorMaxPendingAcks; + } + + public ProtonProtocolManager setMirrorMaxPendingAcks(int maxPendingAcks) { + this.mirrorMaxPendingAcks = maxPendingAcks; + return this; + } + public ProtonProtocolManager setAmqpMinLargeMessageSize(int amqpMinLargeMessageSize) { this.amqpMinLargeMessageSize = amqpMinLargeMessageSize; return this; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index be0895289d0..059d3e91221 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -100,6 +100,27 @@ public static MirrorController getControllerInUse() { return CONTROLLER_THREAD_LOCAL.get(); } + + @Override + public boolean isBusy() { + return getAckManager().size() > getMirrorMaxPendingAcks(); + } + + public int getMirrorMaxPendingAcks() { + try { + return connection.getProtocolManager().getMirrorMaxPendingAcks(); + } catch (Throwable e) { + // It shouldn't happen, but if it did we just log it + logger.warn(e.getMessage(), e); + return 0; + } + } + + public void verifyCredits() { + connection.runNow(creditTopUpRunner); + } + + /** * Objects of this class can be used by either transaction or by OperationContext. It is important that when you're * using the transactions you clear any references to the operation context. Don't use transaction and @@ -473,14 +494,17 @@ private void performAck(String nodeID, logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", nodeID, messageID, targetQueue.getName()); } + getAckManager().ack(nodeID, targetQueue, messageID, reason, true); + + OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, OperationConsistencyLevel.FULL); + } + + private AckManager getAckManager() { if (ackManager == null) { ackManager = AckManagerProvider.getManager(server); ackManager.registerMirror(this); } - - ackManager.ack(nodeID, targetQueue, messageID, reason, true); - - OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation, OperationConsistencyLevel.FULL); + return ackManager; } /** diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java index 0faa078a201..5e3b4b744fd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.LongSupplier; import io.netty.util.collection.LongObjectHashMap; @@ -78,6 +79,13 @@ public class AckManager implements ActiveMQComponent { volatile MultiStepProgress progress; ActiveMQScheduledComponent scheduledComponent; + private volatile int size; + private static final AtomicIntegerFieldUpdater sizeUpdater = AtomicIntegerFieldUpdater.newUpdater(AckManager.class, "size"); + + public int size() { + return sizeUpdater.get(this); + } + public AckManager(ActiveMQServer server) { assert server != null && server.getConfiguration() != null; this.server = server; @@ -92,6 +100,7 @@ public AckManager(ActiveMQServer server) { public void reload(RecordInfo recordInfo) { journalHashMapProvider.reload(recordInfo); + sizeUpdater.incrementAndGet(this); } @Override @@ -104,6 +113,13 @@ public synchronized void stop() { logger.debug("Stopping ackmanager on server {}", server); } + public synchronized void pause() { + if (scheduledComponent != null) { + scheduledComponent.stop(); + scheduledComponent = null; + } + } + @Override public synchronized boolean isStarted() { return scheduledComponent != null && scheduledComponent.isStarted(); @@ -188,6 +204,11 @@ private void flushMirrorTargets() { targetCopy.forEach(AMQPMirrorControllerTarget::flush); } + private void checkFlowControlMirrorTargets() { + List targetCopy = copyTargets(); + targetCopy.forEach(AMQPMirrorControllerTarget::verifyCredits); + } + private synchronized List copyTargets() { return new ArrayList<>(mirrorControllerTargets); } @@ -267,6 +288,8 @@ public void retryAddress(SimpleString address, LongObjectHashMap 0) { diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 4137a140f12..1b3698788b8 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -799,7 +799,15 @@ Number of messages before all addresses will enter into their Full Policy configured. - It works in conjunction with global-max-size, being watever value hits its maximum first. + It works in conjunction with global-max-size, being whatever value hits its maximum first. + + + + + + + + Percentage of JVM Runtime Max Memory that should be used by default for global-max-size. diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index 32830ebe1ab..06f35cd3efc 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -2630,6 +2630,24 @@ public void testExtractPropertyClassName() throws Exception { assertEquals("foo.class.bar", ConfigurationImpl.extractPropertyClassName("foo.class.bar.class")); } + @Test + public void testGlobalMaxSizePercentOfJvmMaxMemory() throws Exception { + + // existing default + long half = Runtime.getRuntime().maxMemory() / 2; + ConfigurationImpl configuration = new ConfigurationImpl(); + assertEquals(half, configuration.getGlobalMaxSize()); + + configuration.setGlobalMaxSizePercentOfJvmMaxMemory(25); + assertEquals(half, configuration.getGlobalMaxSize()); + + // needs new instance + configuration = new ConfigurationImpl(); + configuration.setGlobalMaxSizePercentOfJvmMaxMemory(25); + long quarter = Runtime.getRuntime().maxMemory() / 4; + assertEquals(quarter, configuration.getGlobalMaxSize()); + } + public static class DummyConfig { private int intProperty; private int idCacheSize; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index 09bdc01aaef..1e11b121757 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -768,6 +768,7 @@ public void testMaxSize() throws Exception { 10M 1000 + 30 """); @@ -777,6 +778,7 @@ public void testMaxSize() throws Exception { assertEquals(10 * 1024 * 1024, configuration.getGlobalMaxSize()); assertEquals(1000, configuration.getGlobalMaxMessages()); + assertEquals(30, configuration.getGlobalMaxSizePercentOfJvmMaxMemory()); } @Test diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index be2c322ae4e..ec7a6031c6c 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -646,6 +646,7 @@ private void validateFullConfig(Configuration configInstance) { assertTrue(a2Role.isDeleteNonDurableQueue()); assertFalse(a2Role.isManage()); assertEquals(1234567, configInstance.getGlobalMaxSize()); + assertEquals(30, configInstance.getGlobalMaxSizePercentOfJvmMaxMemory()); assertEquals(37, configInstance.getMaxDiskUsage()); assertEquals(123, configInstance.getDiskScanPeriod()); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index db79760739b..03d54aeacc1 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -62,6 +62,7 @@ 1234567 TEMP 1234567 + 30 37 500Mb 123 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index 40977255a21..33e8f8de607 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -63,6 +63,7 @@ 1234567 TEMP 1234567 + 30 37 123 HALT diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml index c26879b2940..c8b4f352443 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml @@ -63,6 +63,7 @@ 1234567 TEMP 1234567 + 30 37 123 HALT diff --git a/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java b/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java index 0864f5a8bdd..dd287670d84 100644 --- a/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java +++ b/artemis-unit-test-support/src/main/java/org/apache/activemq/artemis/utils/Wait.java @@ -172,6 +172,11 @@ public static void assertTrue(String failureMessage, Condition condition, final assertTrue(failureMessage, condition, duration, SLEEP_MILLIS); } + public static T assertNotNull(Supplier supplier, final long duration, final long sleep) throws Exception { + Assertions.assertTrue(waitFor(() -> supplier.get() != null, duration, sleep)); + return supplier.get(); + } + public static void assertTrue(Condition condition, final long duration, final long sleep) { assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep); } diff --git a/docs/user-manual/paging.adoc b/docs/user-manual/paging.adoc index c5c8537fa9c..ef35aa8bbe3 100644 --- a/docs/user-manual/paging.adoc +++ b/docs/user-manual/paging.adoc @@ -158,7 +158,7 @@ You can also specify `global-max-messages` on the main configuration, specifying When you have more messages than what is configured `global-max-size` any new produced message will make that destination to go through its paging policy. -`global-max-size` is calculated as half of the max memory available to the Java Virtual Machine, unless specified on the `broker.xml` configuration. +`global-max-size` is calculated as a percentage of the max memory available to the Java Virtual Machine, unless specified in the broker.xml configuration directly. The percentage value used is configurable using the option `global-max-size-percent-of-jvm-max-memory` in the XML configuration and if not specified the default value of `global-max-size-percent-of-jvm-max-memory` is 50% or half the max available memory. By default `global-max-messages` = `-1` meaning it's disabled. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java index 650104fd77a..9ad9b364b4a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java @@ -26,10 +26,13 @@ import javax.jms.TopicSubscriber; import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import io.netty.util.collection.LongObjectHashMap; +import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.journal.collections.JournalHashMap; @@ -43,9 +46,11 @@ import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.logs.AssertionLoggerHandler; import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager; import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManagerProvider; @@ -57,17 +62,27 @@ import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_DESTINATION; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; public class AckManagerTest extends ActiveMQTestBase { @@ -86,11 +101,11 @@ public void setUp() throws Exception { server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1).setMaxReadPageMessages(20)); server1.getConfiguration().getAcceptorConfigurations().clear(); server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616"); - server1.start(); } @Test public void testDirectACK() throws Throwable { + server1.start(); String protocol = "AMQP"; @@ -289,9 +304,9 @@ public void testDirectACK() throws Throwable { assertEquals(0, AckManagerProvider.getSize()); } - @Test public void testLogUnack() throws Throwable { + server1.start(); String protocol = "AMQP"; SimpleString TOPIC_NAME = SimpleString.of("tp" + RandomUtil.randomUUIDString()); @@ -340,6 +355,7 @@ public void testLogUnack() throws Throwable { @Test public void testRetryFromPaging() throws Throwable { + server1.start(); String protocol = "AMQP"; @@ -440,6 +456,141 @@ public void testRetryFromPaging() throws Throwable { + @Test + public void testFlowControlOnPendingAcks() throws Throwable { + + server1.getConfiguration().getAcceptorConfigurations().clear(); + server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616?mirrorMaxPendingAcks=100&amqpCredits=100"); + server1.start(); + + String protocol = "AMQP"; + + SimpleString QUEUE_NAME = SimpleString.of("queue_" + RandomUtil.randomUUIDString()); + + Queue testQueue = server1.createQueue(QueueConfiguration.of(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST)); + + ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + + // First step... adding messages to a queue // 50% paging 50% queue + try (Connection connection = connectionFactory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue jmsQueue = session.createQueue(QUEUE_NAME.toString()); + MessageProducer producer = session.createProducer(jmsQueue); + for (int i = 0; i < 50; i++) { + producer.send(session.createTextMessage("hello there " + i)); + } + session.commit(); + + testQueue.getPagingStore().startPaging(); + + for (int i = 0; i < 50; i++) { + producer.send(session.createTextMessage("hello there " + i)); + } + session.commit(); + } + + Wait.assertEquals(100, testQueue::getMessageCount); + + AckManager ackManager = AckManagerProvider.getManager(server1); + assertTrue(ackManager.isStarted()); + ackManager.pause(); + assertFalse(ackManager.isStarted()); + assertSame(ackManager, AckManagerProvider.getManager(server1)); + + // adding fake retries to flood the manager beyond capacity + addFakeRetries(ackManager, testQueue, 1000); + + AmqpClient client = new AmqpClient(new URI("tcp://localhost:61616"), null, null); + AmqpConnection connection = client.connect(); + runAfter(connection::close); + AmqpSession session = connection.createSession(); + + Map properties = new HashMap<>(); + properties.put(AMQPMirrorControllerSource.BROKER_ID, "whatever"); + + AmqpSender sender = session.createSender(QueueImpl.MIRROR_ADDRESS, true, new Symbol[]{Symbol.getSymbol("amq.mirror")}, new Symbol[]{Symbol.getSymbol("amq.mirror")}, properties); + + AMQPMirrorControllerTarget mirrorControllerTarget = Wait.assertNotNull(() -> locateMirrorTarget(server1), 5000, 100); + assertEquals(100, mirrorControllerTarget.getConnection().getProtocolManager().getMirrorMaxPendingAcks()); + assertTrue(mirrorControllerTarget.isBusy()); + // first connection it should be beyond flow control capacity, we should not have any credits here now + assertEquals(0, sender.getEndpoint().getCredit()); + ackManager.start(); + + // manager resumed and the records will be eventually removed, we should be back to capacity + Wait.assertEquals(100, () -> sender.getEndpoint().getCredit(), 5000, 100); + ackManager.pause(); + + addFakeRetries(ackManager, testQueue, 1000); + assertEquals(1000, ackManager.size()); + + // we should be able to send 100 messages + for (int i = 0; i < 100; i++) { + AmqpMessage message = new AmqpMessage(); + message.setAddress(testQueue.getAddress().toString()); + message.setText("hello again " + i); + message.setDeliveryAnnotation(INTERNAL_ID.toString(), server1.getStorageManager().generateID()); + message.setDeliveryAnnotation(INTERNAL_DESTINATION.toString(), testQueue.getAddress().toString()); + sender.send(message); + } + // we should not get any credits + assertEquals(0, sender.getEndpoint().getCredit()); + + Wait.assertEquals(200, testQueue::getMessageCount); + + ackManager.start(); + // after restart we should eventually get replenished on credits + Wait.assertEquals(100, () -> sender.getEndpoint().getCredit(), 5000, 100); + + Wait.assertEquals(200L, testQueue::getMessageCount, 5000, 100); + + AtomicInteger acked = new AtomicInteger(0); + ackManager.pause(); + // Adding hot data... we should be able to flow credits during that + testQueue.forEach(ref -> { + long messageID = ref.getMessageID(); + + Long internalID = (Long) ref.getMessage().getAnnotation(AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY); + String nodeId = (String) ref.getMessage().getAnnotation(AMQPMirrorControllerSource.BROKER_ID_SIMPLE_STRING); + if (internalID != null) { + messageID = internalID.longValue(); + } + ackManager.addRetry(nodeId, testQueue, messageID, AckReason.NORMAL); + acked.incrementAndGet(); + }); + + assertEquals(200, acked.get()); + ackManager.start(); + + for (int i = 0; i < 100; i++) { + AmqpMessage message = new AmqpMessage(); + message.setAddress(testQueue.getAddress().toString()); + message.setText("hello"); + message.setDeliveryAnnotation(INTERNAL_ID.toString(), server1.getStorageManager().generateID()); + message.setDeliveryAnnotation(INTERNAL_DESTINATION.toString(), testQueue.getAddress().toString()); + sender.send(message); + } + + Wait.assertTrue(() -> sender.getEndpoint().getCredit() > 0, 5000, 100); + + // we acked 200 messages through the retry + Wait.assertEquals(100L, testQueue::getMessageCount, 5000, 100); + + ackManager.stop(); + + connection.close(); + + server1.stop(); + + assertEquals(0, AckManagerProvider.getSize()); + } + + private void addFakeRetries(AckManager ackManager, Queue testQueue, int size) { + for (int i = 0; i < size; i++) { + // adding retries that will never succeed, just to fillup the storage hashmap + ackManager.addRetry(null, testQueue, server1.getStorageManager().generateID(), AckReason.NORMAL); + } + } private int getCounter(byte typeRecord, Map values) { AtomicInteger value = values.get((int) typeRecord); diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java index 04991e6ef2c..ca6cde0225f 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java @@ -38,6 +38,7 @@ import org.apache.activemq.artemis.tests.soak.SoakTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.FileUtil; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.cli.commands.helper.HelperCreate; @@ -109,6 +110,10 @@ private static void createServer(String serverName, File brokerPropertiesFile = new File(serverLocation, "broker.properties"); saveProperties(brokerProperties, brokerPropertiesFile); + + File brokerXML = new File(serverLocation, "/etc/broker.xml"); + // Making sure we flow control mirrorACK on the lower side to make sure things are working + assertTrue(FileUtil.findReplace(brokerXML, "", ";mirrorMaxPendingAcks=50")); } @BeforeAll