Skip to content

Commit

Permalink
ARTEMIS-5053 support commit interval for scale-down
Browse files Browse the repository at this point in the history
This commit adds support for configuring how often the transactions
governing scale-down are committed. It also updates HAPolicy with
default implementations which helps clarify which policies really care
about scale-down.
  • Loading branch information
jbertram authored and clebertsuconic committed Mar 3, 2025
1 parent c0e4cb3 commit 27f76f5
Show file tree
Hide file tree
Showing 26 changed files with 203 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ public static String getDefaultHapolicyBackupStrategy() {
// its possible that you only want a server to partake in scale down as a receiver, via a group. In this case set scale-down to false
private static boolean DEFAULT_SCALE_DOWN_ENABLED = true;

// How often to commit transactions for moving messages during scale-down
private static int DEFAULT_SCALE_DOWN_COMMIT_INTERVAL = -1;

// How long to wait for a decision
private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000;

Expand Down Expand Up @@ -1471,6 +1474,13 @@ public static boolean isDefaultScaleDownEnabled() {
return DEFAULT_SCALE_DOWN_ENABLED;
}

/**
* How often to commit transactions for moving messages during scale-down
*/
public static int getDefaultScaleDownCommitInterval() {
return DEFAULT_SCALE_DOWN_COMMIT_INTERVAL;
}

/**
* How long to wait for a decision
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ public static HAPolicy getHAPolicy(HAPolicyConfiguration conf,
public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration scaleDownConfiguration) {
if (scaleDownConfiguration != null) {
if (scaleDownConfiguration.getDiscoveryGroup() != null) {
return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.getCommitInterval());
} else {
return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.getCommitInterval());
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class ScaleDownConfiguration implements Serializable {

private boolean enabled = ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled();

private int commitInterval = ActiveMQDefaultConfiguration.getDefaultScaleDownCommitInterval();

public List<String> getConnectors() {
return connectors;
}
Expand Down Expand Up @@ -83,4 +85,13 @@ public ScaleDownConfiguration setEnabled(boolean enabled) {
this.enabled = enabled;
return this;
}

public int getCommitInterval() {
return commitInterval;
}

public ScaleDownConfiguration setCommitInterval(int commitInterval) {
this.commitInterval = commitInterval;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1952,6 +1952,8 @@ private ScaleDownConfiguration parseScaleDownConfig(Element policyNode) {

scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, "enabled", scaleDownConfiguration.isEnabled()));

scaleDownConfiguration.setCommitInterval(getInteger(scaleDownElement, "commit-interval", scaleDownConfiguration.getCommitInterval(), MINUS_ONE_OR_GT_ZERO));

NodeList discoveryGroupRef = scaleDownElement.getElementsByTagName("discovery-group-ref");

if (discoveryGroupRef.item(0) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ public boolean isBackup() {
return true;
}

@Override
public String getScaleDownClustername() {
return null;
}

@Override
public String getScaleDownGroupName() {
return getScaleDownPolicy() != null ? getScaleDownPolicy().getGroupName() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ public String getBackupGroupName() {
return primaryPolicy.getBackupGroupName();
}

@Override
public String getScaleDownGroupName() {
return null;
}

@Override
public boolean isSharedStore() {
return backupPolicy.isSharedStore();
Expand All @@ -101,11 +96,6 @@ public boolean canScaleDown() {
return false;
}

@Override
public String getScaleDownClustername() {
return null;
}

public boolean isRequestBackup() {
return requestBackup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,17 @@ default boolean isWaitForActivation() {

String getBackupGroupName();

String getScaleDownGroupName();
default String getScaleDownGroupName() {
return null;
}

default String getScaleDownClustername() {
return null;
}

String getScaleDownClustername();
default int getScaleDownCommitInterval() {
return -1;
}

default boolean useQuorumManager() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public String getScaleDownClustername() {
return null;
}

@Override
public int getScaleDownCommitInterval() {
return scaleDownPolicy == null ? -1 : scaleDownPolicy.getCommitInterval();
}

@Override
public boolean isSharedStore() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,6 @@ public String getGroupName() {
return groupName;
}

@Override
public String getScaleDownGroupName() {
return null;
}

public void setGroupName(String groupName) {
this.groupName = groupName;
}
Expand All @@ -211,11 +206,6 @@ public boolean canScaleDown() {
return false;
}

@Override
public String getScaleDownClustername() {
return null;
}

public void setAllowAutoFailBack(boolean allowAutoFailBack) {
this.allowAutoFailBack = allowAutoFailBack;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,6 @@ public boolean canScaleDown() {
return false;
}

@Override
public String getScaleDownGroupName() {
return null;
}

@Override
public String getScaleDownClustername() {
return null;
}

public String getClusterName() {
return clusterName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,6 @@ public String getBackupGroupName() {
return groupName;
}

@Override
public String getScaleDownGroupName() {
return null;
}

@Override
public String getScaleDownClustername() {
return null;
}

public boolean isAllowAutoFailBack() {
return allowAutoFailBack;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,25 @@ public class ScaleDownPolicy {

private boolean enabled;

private int commitInterval;

public ScaleDownPolicy() {
}

public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled) {
public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled, int commitInterval) {
this.connectors = connectors;
this.groupName = groupName;
this.clusterName = clusterName;
this.enabled = enabled;
this.commitInterval = commitInterval;
}

public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled) {
public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled, int commitInterval) {
this.discoveryGroup = discoveryGroup;
this.groupName = groupName;
this.clusterName = clusterName;
this.enabled = enabled;
this.commitInterval = commitInterval;
}

public List<String> getConnectors() {
Expand Down Expand Up @@ -98,6 +102,14 @@ public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public int getCommitInterval() {
return commitInterval;
}

public void setCommitInterval(int commitInterval) {
this.commitInterval = commitInterval;
}

public static ServerLocatorInternal getScaleDownConnector(ScaleDownPolicy scaleDownPolicy,
ActiveMQServer activeMQServer) throws ActiveMQException {
if (!scaleDownPolicy.getConnectors().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,4 @@ public PrimaryActivation createActivation(ActiveMQServerImpl server,
public String getBackupGroupName() {
return null;
}

@Override
public String getScaleDownGroupName() {
return null;
}

@Override
public String getScaleDownClustername() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void handleDuplicateIds(Map<SimpleString, List<Pair<byte[], Long>>> dupli
public void postLoad(Journal messageJournal,
ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer != null ? parentServer.getStorageManager() : storageManager);
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer != null ? parentServer.getStorageManager() : storageManager, parentServer != null ? parentServer.getHAPolicy().getScaleDownCommitInterval() : -1);
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator, storageManager));

try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void freezeConnections(RemotingService remotingService) {
public void postConnectionFreeze() {
if (primaryOnlyPolicy.getScaleDownPolicy() != null && primaryOnlyPolicy.getScaleDownPolicy().isEnabled() && scaleDownClientSessionFactory != null) {
try {
scaleDown();
scaleDown(primaryOnlyPolicy.getScaleDownPolicy().getCommitInterval());
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.failedToScaleDown(e);
} finally {
Expand Down Expand Up @@ -190,8 +190,8 @@ public void connectToScaleDownTarget(ScaleDownPolicy scaleDownPolicy) {
}
}

public long scaleDown() throws Exception {
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(activeMQServer.getPagingManager(), activeMQServer.getPostOffice(), activeMQServer.getNodeManager(), activeMQServer.getClusterManager().getClusterController(), activeMQServer.getStorageManager());
public long scaleDown(int commitInterval) throws Exception {
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(activeMQServer.getPagingManager(), activeMQServer.getPostOffice(), activeMQServer.getNodeManager(), activeMQServer.getClusterManager().getClusterController(), activeMQServer.getStorageManager(), commitInterval);
ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = ((PostOfficeImpl) activeMQServer.getPostOffice()).getDuplicateIDCaches();
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>();
for (SimpleString address : duplicateIDCaches.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,21 @@ public class ScaleDownHandler {
private NodeManager nodeManager;
private final ClusterController clusterController;
private final StorageManager storageManager;
private final int commitInterval;
private String targetNodeId;

public ScaleDownHandler(PagingManager pagingManager,
PostOffice postOffice,
NodeManager nodeManager,
ClusterController clusterController,
StorageManager storageManager) {
StorageManager storageManager,
int commitInterval) {
this.pagingManager = pagingManager;
this.postOffice = postOffice;
this.nodeManager = nodeManager;
this.clusterController = clusterController;
this.storageManager = storageManager;
this.commitInterval = commitInterval;
}

public long scaleDown(ClientSessionFactory sessionFactory,
Expand Down Expand Up @@ -214,6 +217,10 @@ public long scaleDownRegularMessages(final SimpleString address,

producer.send(address, message);
messageCount++;
if (commitInterval > 0 && messageCount % commitInterval == 0) {
tx.commit();
tx = new TransactionImpl(storageManager);
}

messagesIterator.remove();

Expand Down Expand Up @@ -307,6 +314,10 @@ private long scaleDownSNF(final SimpleString address,
producer.send(message.getAddress(), message);

messageCount++;
if (commitInterval > 0 && messageCount % commitInterval == 0) {
tx.commit();
tx = new TransactionImpl(storageManager);
}

messagesIterator.remove();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3714,6 +3714,14 @@
</xsd:complexType>
</xsd:element>
</xsd:choice>
<xsd:element name="commit-interval" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
How often to commit when scaling messages down from one broker to another.
-1 means commit only after processing all the messages from a queue.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ private void addScaleDownConfigurationProperties(Properties properties) {
properties.put("HAPolicyConfiguration.scaleDownConfiguration.groupName", "g0");
properties.put("HAPolicyConfiguration.scaleDownConfiguration.clusterName", "c0");
properties.put("HAPolicyConfiguration.scaleDownConfiguration.enabled", "false");
properties.put("HAPolicyConfiguration.scaleDownConfiguration.commitInterval", "73");
}

private void checkScaleDownConfiguration(ScaleDownConfiguration scaleDownConfiguration) {
Expand All @@ -962,6 +963,7 @@ private void checkScaleDownConfiguration(ScaleDownConfiguration scaleDownConfigu
assertEquals("g0", scaleDownConfiguration.getGroupName());
assertEquals("c0", scaleDownConfiguration.getClusterName());
assertFalse(scaleDownConfiguration.isEnabled());
assertEquals(73, scaleDownConfiguration.getCommitInterval());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public void testParsingScaleDownConfig() throws Exception {
<scale-down>
<connectors>
<connector-ref>server0-connector</connector-ref>
<commit-interval>33</commit-interval>
</connectors>
</scale-down>
</live-only>
Expand All @@ -509,6 +510,7 @@ public void testParsingScaleDownConfig() throws Exception {
assertEquals(1, connectors.size());
String connector = connectors.get(0);
assertEquals("server0-connector", connector);
assertEquals(33, scaledownCfg.getCommitInterval());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ public void testFileConfiguration() {
assertNotNull(lopc.getScaleDownConfiguration());
assertEquals("boo!", lopc.getScaleDownConfiguration().getGroupName());
assertEquals("dg1", lopc.getScaleDownConfiguration().getDiscoveryGroup());
assertEquals(33, lopc.getScaleDownConfiguration().getCommitInterval());

for (ClusterConnectionConfiguration ccc : conf.getClusterConfigurations()) {
if (ccc.getName().equals("cluster-connection3")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@
<group-name>boo!</group-name>
<!--either a discovery group-->
<discovery-group-ref discovery-group-name="dg1"/>
<commit-interval>33</commit-interval>
</scale-down>
</primary-only>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@
<group-name>boo!</group-name>
<!--either a discovery group-->
<discovery-group-ref discovery-group-name="dg1"/>
<commit-interval>33</commit-interval>
</scale-down>
</primary-only>

Expand Down
Loading

0 comments on commit 27f76f5

Please sign in to comment.