From 27f76f569c4bbc54f3688ef73f61f903fe3ae62d Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Mon, 3 Mar 2025 12:04:43 -0600 Subject: [PATCH] ARTEMIS-5053 support commit interval for scale-down This commit adds support for configuring how often the transactions governing scale-down are committed. It also updates HAPolicy with default implementations which helps clarify which policies really care about scale-down. --- .../config/ActiveMQDefaultConfiguration.java | 10 ++ .../core/config/ConfigurationUtils.java | 4 +- .../core/config/ScaleDownConfiguration.java | 11 ++ .../impl/FileConfigurationParser.java | 2 + .../core/server/cluster/ha/BackupPolicy.java | 5 - .../server/cluster/ha/ColocatedPolicy.java | 10 -- .../core/server/cluster/ha/HAPolicy.java | 12 +- .../server/cluster/ha/PrimaryOnlyPolicy.java | 5 + .../server/cluster/ha/ReplicatedPolicy.java | 10 -- .../cluster/ha/ReplicationBackupPolicy.java | 10 -- .../cluster/ha/ReplicationPrimaryPolicy.java | 10 -- .../server/cluster/ha/ScaleDownPolicy.java | 16 ++- .../cluster/ha/SharedStorePrimaryPolicy.java | 10 -- .../impl/BackupRecoveryJournalLoader.java | 2 +- .../server/impl/PrimaryOnlyActivation.java | 6 +- .../core/server/impl/ScaleDownHandler.java | 13 ++- .../schema/artemis-configuration.xsd | 8 ++ .../config/impl/ConfigurationImplTest.java | 2 + .../impl/FileConfigurationParserTest.java | 2 + .../config/impl/FileConfigurationTest.java | 1 + .../ConfigurationTest-full-config.xml | 1 + .../ConfigurationTest-xinclude-config.xml | 1 + ...nTest-xinclude-schema-config-ha-policy.xml | 1 + docs/user-manual/ha.adoc | 13 +++ .../server/ScaleDownCommitIntervalTest.java | 103 ++++++++++++++++++ .../server/ScaleDownDirectTest.java | 2 +- 26 files changed, 203 insertions(+), 67 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownCommitIntervalTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 18a76a0e193..28769295137 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -440,6 +440,9 @@ public static String getDefaultHapolicyBackupStrategy() { // its possible that you only want a server to partake in scale down as a receiver, via a group. In this case set scale-down to false private static boolean DEFAULT_SCALE_DOWN_ENABLED = true; + // How often to commit transactions for moving messages during scale-down + private static int DEFAULT_SCALE_DOWN_COMMIT_INTERVAL = -1; + // How long to wait for a decision private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000; @@ -1471,6 +1474,13 @@ public static boolean isDefaultScaleDownEnabled() { return DEFAULT_SCALE_DOWN_ENABLED; } + /** + * How often to commit transactions for moving messages during scale-down + */ + public static int getDefaultScaleDownCommitInterval() { + return DEFAULT_SCALE_DOWN_COMMIT_INTERVAL; + } + /** * How long to wait for a decision */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java index a8ff38aa204..8b9494b8c9f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java @@ -135,9 +135,9 @@ public static HAPolicy getHAPolicy(HAPolicyConfiguration conf, public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration scaleDownConfiguration) { if (scaleDownConfiguration != null) { if (scaleDownConfiguration.getDiscoveryGroup() != null) { - return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled()); + return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.getCommitInterval()); } else { - return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled()); + return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.getCommitInterval()); } } return null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java index 5f58e36bdca..848c54ae1d8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java @@ -34,6 +34,8 @@ public class ScaleDownConfiguration implements Serializable { private boolean enabled = ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled(); + private int commitInterval = ActiveMQDefaultConfiguration.getDefaultScaleDownCommitInterval(); + public List getConnectors() { return connectors; } @@ -83,4 +85,13 @@ public ScaleDownConfiguration setEnabled(boolean enabled) { this.enabled = enabled; return this; } + + public int getCommitInterval() { + return commitInterval; + } + + public ScaleDownConfiguration setCommitInterval(int commitInterval) { + this.commitInterval = commitInterval; + return this; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 5e5a0dfa516..48c04105aeb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1952,6 +1952,8 @@ private ScaleDownConfiguration parseScaleDownConfig(Element policyNode) { scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, "enabled", scaleDownConfiguration.isEnabled())); + scaleDownConfiguration.setCommitInterval(getInteger(scaleDownElement, "commit-interval", scaleDownConfiguration.getCommitInterval(), MINUS_ONE_OR_GT_ZERO)); + NodeList discoveryGroupRef = scaleDownElement.getElementsByTagName("discovery-group-ref"); if (discoveryGroupRef.item(0) != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java index 51f5c050fbe..3acf7eb5746 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java @@ -37,11 +37,6 @@ public boolean isBackup() { return true; } - @Override - public String getScaleDownClustername() { - return null; - } - @Override public String getScaleDownGroupName() { return getScaleDownPolicy() != null ? getScaleDownPolicy().getGroupName() : null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java index b4859e48164..8fff208b58b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java @@ -73,11 +73,6 @@ public String getBackupGroupName() { return primaryPolicy.getBackupGroupName(); } - @Override - public String getScaleDownGroupName() { - return null; - } - @Override public boolean isSharedStore() { return backupPolicy.isSharedStore(); @@ -101,11 +96,6 @@ public boolean canScaleDown() { return false; } - @Override - public String getScaleDownClustername() { - return null; - } - public boolean isRequestBackup() { return requestBackup; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java index bc7a6e1462f..f98337f73f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java @@ -52,9 +52,17 @@ default boolean isWaitForActivation() { String getBackupGroupName(); - String getScaleDownGroupName(); + default String getScaleDownGroupName() { + return null; + } + + default String getScaleDownClustername() { + return null; + } - String getScaleDownClustername(); + default int getScaleDownCommitInterval() { + return -1; + } default boolean useQuorumManager() { return true; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java index 9fab18f3aba..a98989d0430 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/PrimaryOnlyPolicy.java @@ -57,6 +57,11 @@ public String getScaleDownClustername() { return null; } + @Override + public int getScaleDownCommitInterval() { + return scaleDownPolicy == null ? -1 : scaleDownPolicy.getCommitInterval(); + } + @Override public boolean isSharedStore() { return false; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java index 6f83d2a8e86..1f21bd7a709 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java @@ -187,11 +187,6 @@ public String getGroupName() { return groupName; } - @Override - public String getScaleDownGroupName() { - return null; - } - public void setGroupName(String groupName) { this.groupName = groupName; } @@ -211,11 +206,6 @@ public boolean canScaleDown() { return false; } - @Override - public String getScaleDownClustername() { - return null; - } - public void setAllowAutoFailBack(boolean allowAutoFailBack) { this.allowAutoFailBack = allowAutoFailBack; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java index 50080d2e09e..f5484408473 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationBackupPolicy.java @@ -117,16 +117,6 @@ public boolean canScaleDown() { return false; } - @Override - public String getScaleDownGroupName() { - return null; - } - - @Override - public String getScaleDownClustername() { - return null; - } - public String getClusterName() { return clusterName; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java index 8d7af9cf51a..6a611cd3c1f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicationPrimaryPolicy.java @@ -128,16 +128,6 @@ public String getBackupGroupName() { return groupName; } - @Override - public String getScaleDownGroupName() { - return null; - } - - @Override - public String getScaleDownClustername() { - return null; - } - public boolean isAllowAutoFailBack() { return allowAutoFailBack; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java index 0ef96d53220..e50a8c32487 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java @@ -41,21 +41,25 @@ public class ScaleDownPolicy { private boolean enabled; + private int commitInterval; + public ScaleDownPolicy() { } - public ScaleDownPolicy(List connectors, String groupName, String clusterName, boolean enabled) { + public ScaleDownPolicy(List connectors, String groupName, String clusterName, boolean enabled, int commitInterval) { this.connectors = connectors; this.groupName = groupName; this.clusterName = clusterName; this.enabled = enabled; + this.commitInterval = commitInterval; } - public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled) { + public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled, int commitInterval) { this.discoveryGroup = discoveryGroup; this.groupName = groupName; this.clusterName = clusterName; this.enabled = enabled; + this.commitInterval = commitInterval; } public List getConnectors() { @@ -98,6 +102,14 @@ public void setEnabled(boolean enabled) { this.enabled = enabled; } + public int getCommitInterval() { + return commitInterval; + } + + public void setCommitInterval(int commitInterval) { + this.commitInterval = commitInterval; + } + public static ServerLocatorInternal getScaleDownConnector(ScaleDownPolicy scaleDownPolicy, ActiveMQServer activeMQServer) throws ActiveMQException { if (!scaleDownPolicy.getConnectors().isEmpty()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java index 0b5eecceafa..b5cdf2e9af7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStorePrimaryPolicy.java @@ -100,14 +100,4 @@ public PrimaryActivation createActivation(ActiveMQServerImpl server, public String getBackupGroupName() { return null; } - - @Override - public String getScaleDownGroupName() { - return null; - } - - @Override - public String getScaleDownClustername() { - return null; - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java index 86476a241f0..4a39855aed9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java @@ -92,7 +92,7 @@ public void handleDuplicateIds(Map>> dupli public void postLoad(Journal messageJournal, ResourceManager resourceManager, Map>> duplicateIDMap) throws Exception { - ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer != null ? parentServer.getStorageManager() : storageManager); + ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer != null ? parentServer.getStorageManager() : storageManager, parentServer != null ? parentServer.getHAPolicy().getScaleDownCommitInterval() : -1); locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, storageManager)); try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java index 903a1ec919a..0606f60a71d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PrimaryOnlyActivation.java @@ -127,7 +127,7 @@ public void freezeConnections(RemotingService remotingService) { public void postConnectionFreeze() { if (primaryOnlyPolicy.getScaleDownPolicy() != null && primaryOnlyPolicy.getScaleDownPolicy().isEnabled() && scaleDownClientSessionFactory != null) { try { - scaleDown(); + scaleDown(primaryOnlyPolicy.getScaleDownPolicy().getCommitInterval()); } catch (Exception e) { ActiveMQServerLogger.LOGGER.failedToScaleDown(e); } finally { @@ -190,8 +190,8 @@ public void connectToScaleDownTarget(ScaleDownPolicy scaleDownPolicy) { } } - public long scaleDown() throws Exception { - ScaleDownHandler scaleDownHandler = new ScaleDownHandler(activeMQServer.getPagingManager(), activeMQServer.getPostOffice(), activeMQServer.getNodeManager(), activeMQServer.getClusterManager().getClusterController(), activeMQServer.getStorageManager()); + public long scaleDown(int commitInterval) throws Exception { + ScaleDownHandler scaleDownHandler = new ScaleDownHandler(activeMQServer.getPagingManager(), activeMQServer.getPostOffice(), activeMQServer.getNodeManager(), activeMQServer.getClusterManager().getClusterController(), activeMQServer.getStorageManager(), commitInterval); ConcurrentMap duplicateIDCaches = ((PostOfficeImpl) activeMQServer.getPostOffice()).getDuplicateIDCaches(); Map>> duplicateIDMap = new HashMap<>(); for (SimpleString address : duplicateIDCaches.keySet()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java index 2ffe947afbc..c45a8ae9492 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java @@ -76,18 +76,21 @@ public class ScaleDownHandler { private NodeManager nodeManager; private final ClusterController clusterController; private final StorageManager storageManager; + private final int commitInterval; private String targetNodeId; public ScaleDownHandler(PagingManager pagingManager, PostOffice postOffice, NodeManager nodeManager, ClusterController clusterController, - StorageManager storageManager) { + StorageManager storageManager, + int commitInterval) { this.pagingManager = pagingManager; this.postOffice = postOffice; this.nodeManager = nodeManager; this.clusterController = clusterController; this.storageManager = storageManager; + this.commitInterval = commitInterval; } public long scaleDown(ClientSessionFactory sessionFactory, @@ -214,6 +217,10 @@ public long scaleDownRegularMessages(final SimpleString address, producer.send(address, message); messageCount++; + if (commitInterval > 0 && messageCount % commitInterval == 0) { + tx.commit(); + tx = new TransactionImpl(storageManager); + } messagesIterator.remove(); @@ -307,6 +314,10 @@ private long scaleDownSNF(final SimpleString address, producer.send(message.getAddress(), message); messageCount++; + if (commitInterval > 0 && messageCount % commitInterval == 0) { + tx.commit(); + tx = new TransactionImpl(storageManager); + } messagesIterator.remove(); diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index db1e93eba04..4137a140f12 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -3714,6 +3714,14 @@ + + + + How often to commit when scaling messages down from one broker to another. + -1 means commit only after processing all the messages from a queue. + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java index 6b9fef9cecc..9e019323aef 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java @@ -953,6 +953,7 @@ private void addScaleDownConfigurationProperties(Properties properties) { properties.put("HAPolicyConfiguration.scaleDownConfiguration.groupName", "g0"); properties.put("HAPolicyConfiguration.scaleDownConfiguration.clusterName", "c0"); properties.put("HAPolicyConfiguration.scaleDownConfiguration.enabled", "false"); + properties.put("HAPolicyConfiguration.scaleDownConfiguration.commitInterval", "73"); } private void checkScaleDownConfiguration(ScaleDownConfiguration scaleDownConfiguration) { @@ -962,6 +963,7 @@ private void checkScaleDownConfiguration(ScaleDownConfiguration scaleDownConfigu assertEquals("g0", scaleDownConfiguration.getGroupName()); assertEquals("c0", scaleDownConfiguration.getClusterName()); assertFalse(scaleDownConfiguration.isEnabled()); + assertEquals(73, scaleDownConfiguration.getCommitInterval()); } @Test diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index 2003207dc80..09bdc01aaef 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -491,6 +491,7 @@ public void testParsingScaleDownConfig() throws Exception { server0-connector + 33 @@ -509,6 +510,7 @@ public void testParsingScaleDownConfig() throws Exception { assertEquals(1, connectors.size()); String connector = connectors.get(0); assertEquals("server0-connector", connector); + assertEquals(33, scaledownCfg.getCommitInterval()); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 5f82220ad6a..c8ff8202154 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -451,6 +451,7 @@ public void testFileConfiguration() { assertNotNull(lopc.getScaleDownConfiguration()); assertEquals("boo!", lopc.getScaleDownConfiguration().getGroupName()); assertEquals("dg1", lopc.getScaleDownConfiguration().getDiscoveryGroup()); + assertEquals(33, lopc.getScaleDownConfiguration().getCommitInterval()); for (ClusterConnectionConfiguration ccc : conf.getClusterConfigurations()) { if (ccc.getName().equals("cluster-connection3")) { diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index ad9e0b1a295..543b2e96efc 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -413,6 +413,7 @@ boo! + 33 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index 99fac28fcad..40977255a21 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -301,6 +301,7 @@ boo! + 33 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml index 5913f091976..5d8ae2b1de2 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-ha-policy.xml @@ -23,6 +23,7 @@ boo! + 33 diff --git a/docs/user-manual/ha.adoc b/docs/user-manual/ha.adoc index 77d15c59272..7cae149fe89 100644 --- a/docs/user-manual/ha.adoc +++ b/docs/user-manual/ha.adoc @@ -836,6 +836,19 @@ It is also possible to use discovery to scale down, this would look like: ---- +[NOTE] +==== +Moving messages from one broker to another during scale-down involves an internal transaction. +By default this transaction is only committed once per queue. +However, as the number of messages in the queue grows so does the memory requirements for the transaction. +At some point the memory requirements for the transaction will exceed the limits of the available heap. + +In order to deal with this you can configure the `commit-interval` in the `scale-down` element. +This will allow the transaction to be committed every so often which will free the memory from the transaction. +It must be greater than `0` or `-1`. +It is `-1` by default (i.e. don't commit until all the messages in the queue are scaled-down). +==== + === Scale Down with groups It is also possible to configure servers to only scale down to servers that belong in the same group. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownCommitIntervalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownCommitIntervalTest.java new file mode 100644 index 00000000000..212ae656347 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownCommitIntervalTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.server; + +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.core.server.impl.ScaleDownHandler; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class ScaleDownCommitIntervalTest extends ClusterTestBase { + final int TEST_SIZE = 1000; + + @Override + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + setupPrimaryServer(0, isFileStorage(), true, true); + setupPrimaryServer(1, isFileStorage(), true, true); + startServers(0, 1); + setupSessionFactory(0, true); + setupSessionFactory(1, true); + } + + @Test + public void testSmallCommitInterval() throws Exception { + testCommitInterval(1); + } + + @Test + public void testMediumCommitInterval() throws Exception { + testCommitInterval((int) (TEST_SIZE * 0.33)); + } + + @Test + public void testLargeCommitInterval() throws Exception { + testCommitInterval((int) (TEST_SIZE * 0.66)); + } + + @Test + public void testMaxCommitInterval() throws Exception { + testCommitInterval(-1); + } + + private void testCommitInterval(int commitInterval) throws Exception { + final String addressName = "testAddress"; + final String queueName1 = "testQueue1"; + final String queueName2 = "testQueue2"; + + // create 2 queues on each node mapped to the same address + createQueue(0, addressName, queueName1, null, true); + createQueue(0, addressName, queueName2, null, true); + createQueue(1, addressName, queueName1, null, true); + createQueue(1, addressName, queueName2, null, true); + + // send messages to node 0 + send(0, addressName, TEST_SIZE, true, null); + + // consume a message from queue 2 + addConsumer(1, 0, queueName2, null, false); + ClientMessage clientMessage = consumers[1].getConsumer().receive(250); + assertNotNull(clientMessage); + clientMessage.acknowledge(); + consumers[1].getSession().commit(); + removeConsumer(1); + + Wait.assertEquals((long) TEST_SIZE, () -> servers[0].locateQueue(queueName1).getMessageCount(), 500, 20); + Wait.assertEquals((long) TEST_SIZE - 1, () -> servers[0].locateQueue(queueName2).getMessageCount(), 500, 20); + + assertEquals((long) TEST_SIZE, performScaledown(commitInterval)); + + // trigger scaleDown from node 0 to node 1 + servers[0].stop(); + + Wait.assertEquals((long) TEST_SIZE, () -> servers[1].locateQueue(queueName1).getMessageCount(), 500, 20); + Wait.assertEquals((long) TEST_SIZE - 1, () -> servers[1].locateQueue(queueName2).getMessageCount(), 500, 20); + } + + private long performScaledown(int commitInterval) throws Exception { + ScaleDownHandler handler = new ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(), servers[0].getNodeManager(), servers[0].getClusterManager().getClusterController(), servers[0].getStorageManager(), commitInterval); + + return handler.scaleDownMessages(sfs[1], servers[1].getNodeID(), servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword()); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java index ec81070ad02..000726b001c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDirectTest.java @@ -374,7 +374,7 @@ private void checkBody(ClientMessage message, int bufferSize) { } private long performScaledown() throws Exception { - ScaleDownHandler handler = new ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(), servers[0].getNodeManager(), servers[0].getClusterManager().getClusterController(), servers[0].getStorageManager()); + ScaleDownHandler handler = new ScaleDownHandler(servers[0].getPagingManager(), servers[0].getPostOffice(), servers[0].getNodeManager(), servers[0].getClusterManager().getClusterController(), servers[0].getStorageManager(), -1); return handler.scaleDownMessages(sfs[1], servers[1].getNodeID(), servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword()); }