-
Notifications
You must be signed in to change notification settings - Fork 928
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
NO-JIRA Test validating multiple mirrors connected to a node
- Loading branch information
1 parent
5d5b2fe
commit 817dc41
Showing
1 changed file
with
223 additions
and
0 deletions.
There are no files selected for viewing
223 changes: 223 additions & 0 deletions
223
...a/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/MultiMirrorSoakTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror; | ||
|
||
import javax.jms.Connection; | ||
import javax.jms.ConnectionFactory; | ||
import javax.jms.MessageConsumer; | ||
import javax.jms.MessageProducer; | ||
import javax.jms.Queue; | ||
import javax.jms.Session; | ||
import javax.jms.TextMessage; | ||
import java.io.File; | ||
import java.io.StringWriter; | ||
import java.lang.invoke.MethodHandles; | ||
import java.util.Properties; | ||
|
||
import org.apache.activemq.artemis.api.core.management.SimpleManagement; | ||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; | ||
import org.apache.activemq.artemis.tests.soak.SoakTestBase; | ||
import org.apache.activemq.artemis.tests.util.CFUtil; | ||
import org.apache.activemq.artemis.util.ServerUtil; | ||
import org.apache.activemq.artemis.utils.FileUtil; | ||
import org.apache.activemq.artemis.utils.Wait; | ||
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; | ||
import org.junit.Assert; | ||
import org.junit.Test; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class MultiMirrorSoakTest extends SoakTestBase { | ||
|
||
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); | ||
|
||
private static String largeBody; | ||
private static String smallBody = "This is a small body"; | ||
|
||
static { | ||
StringWriter writer = new StringWriter(); | ||
while (writer.getBuffer().length() < 1024 * 1024) { | ||
writer.append("This is a large string ..... "); | ||
} | ||
largeBody = writer.toString(); | ||
} | ||
|
||
public static final String DC1_NODE_A = "MultiMirrorSoakTest/DC1"; | ||
public static final String DC2_NODE_A = "MultiMirrorSoakTest/DC2"; | ||
public static final String DC3_NODE_A = "MultiMirrorSoakTest/DC3"; | ||
|
||
Process processDC1_node_A; | ||
Process processDC2_node_A; | ||
Process processDC3_node_A; | ||
|
||
private static String DC1_NODEA_URI = "tcp://localhost:61616"; | ||
private static String DC2_NODEA_URI = "tcp://localhost:61617"; | ||
private static String DC3_NODEA_URI = "tcp://localhost:61618"; | ||
|
||
private static void createServer(String serverName, | ||
int porOffset, | ||
boolean paging, | ||
String... mirrorTo) throws Exception { | ||
File serverLocation = getFileServerLocation(serverName); | ||
deleteDirectory(serverLocation); | ||
|
||
HelperCreate cliCreateServer = new HelperCreate(); | ||
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation); | ||
cliCreateServer.setMessageLoadBalancing("ON_DEMAND"); | ||
cliCreateServer.setClustered(false); | ||
cliCreateServer.setNoWeb(true); | ||
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A); | ||
cliCreateServer.addArgs("--addresses", "order"); | ||
cliCreateServer.addArgs("--queues", "myQueue"); | ||
cliCreateServer.setPortOffset(porOffset); | ||
cliCreateServer.createServer(); | ||
|
||
Properties brokerProperties = new Properties(); | ||
brokerProperties.put("largeMessageSync", "false"); | ||
if (mirrorTo != null && mirrorTo.length > 0) { | ||
int mirrorID = 0; | ||
for (String p : mirrorTo) { | ||
brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".uri", p); | ||
brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".retryInterval", "100"); | ||
brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString()); | ||
brokerProperties.put("AMQPConnections.mirror" + mirrorID + ".connectionElements.mirror.sync", "false"); | ||
mirrorID++; | ||
} | ||
} | ||
File brokerPropertiesFile = new File(serverLocation, "broker.properties"); | ||
saveProperties(brokerProperties, brokerPropertiesFile); | ||
|
||
File brokerXml = new File(serverLocation, "/etc/broker.xml"); | ||
Assert.assertTrue(brokerXml.exists()); | ||
// Adding redistribution delay to broker configuration | ||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by ClusteredMirrorSoakTest.java --> \n")); | ||
if (paging) { | ||
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-size-messages>-1</max-size-messages>", "<max-size-messages>1</max-size-messages>")); | ||
} | ||
} | ||
|
||
public static void createRealServers(boolean paging) throws Exception { | ||
createServer(DC1_NODE_A, 0, paging, DC2_NODEA_URI, DC3_NODEA_URI); | ||
createServer(DC2_NODE_A, 1, paging, DC1_NODEA_URI); | ||
createServer(DC3_NODE_A, 2, paging, DC1_NODEA_URI); | ||
} | ||
|
||
private void startServers() throws Exception { | ||
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties")); | ||
processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties")); | ||
processDC3_node_A = startServer(DC3_NODE_A, -1, -1, new File(getServerLocation(DC3_NODE_A), "broker.properties")); | ||
|
||
ServerUtil.waitForServerToStart(0, 10_000); | ||
ServerUtil.waitForServerToStart(1, 10_000); | ||
ServerUtil.waitForServerToStart(2, 10_000); | ||
} | ||
|
||
@Test | ||
public void testMultiMirror() throws Exception { | ||
createRealServers(false); | ||
startServers(); | ||
internalMirror(DC1_NODEA_URI, DC3_NODEA_URI); | ||
internalMirror(DC1_NODEA_URI, DC2_NODEA_URI); | ||
internalMirror(DC3_NODEA_URI, DC1_NODEA_URI); | ||
internalMirror(DC1_NODEA_URI, DC1_NODEA_URI); | ||
internalMirror(DC2_NODEA_URI, DC3_NODEA_URI); | ||
} | ||
|
||
public void internalMirror(String producerURI, String consumerURi) throws Exception { | ||
final int numberOfMessages = 200; | ||
|
||
Assert.assertTrue("numberOfMessages must be even", numberOfMessages % 2 == 0); | ||
|
||
ConnectionFactory producerCF = CFUtil.createConnectionFactory("amqp", producerURI); | ||
|
||
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null); | ||
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null); | ||
SimpleManagement simpleManagementDC3A = new SimpleManagement(DC3_NODEA_URI, null, null); | ||
|
||
String queueName = "myQueue"; | ||
|
||
try (Connection connection = producerCF.createConnection()) { | ||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); | ||
Queue queue = session.createQueue(queueName); | ||
MessageProducer producer = session.createProducer(queue); | ||
|
||
for (int i = 0; i < numberOfMessages; i++) { | ||
TextMessage message; | ||
boolean large; | ||
if (i % 1 == 2) { | ||
message = session.createTextMessage(largeBody); | ||
large = true; | ||
} else { | ||
message = session.createTextMessage(smallBody); | ||
large = false; | ||
} | ||
message.setIntProperty("i", i); | ||
message.setBooleanProperty("large", large); | ||
producer.send(message); | ||
if (i % 100 == 0) { | ||
logger.debug("commit {}", i); | ||
session.commit(); | ||
|
||
Wait.assertEquals(i + 1, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName)); | ||
Wait.assertEquals(i + 1, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName)); | ||
Wait.assertEquals(i + 1, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName)); | ||
} | ||
} | ||
session.commit(); | ||
} | ||
|
||
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName)); | ||
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName)); | ||
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName)); | ||
|
||
ConnectionFactory consumerCF = CFUtil.createConnectionFactory("amqp", consumerURi); | ||
|
||
try (Connection connection = consumerCF.createConnection()) { | ||
connection.start(); | ||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); | ||
Queue queue = session.createQueue(queueName); | ||
MessageConsumer consumer = session.createConsumer(queue); | ||
|
||
for (int i = 0; i < numberOfMessages; i++) { | ||
TextMessage message; | ||
boolean large; | ||
if (i % 1 == 2) { | ||
large = true; | ||
} else { | ||
large = false; | ||
} | ||
message = (TextMessage) consumer.receive(5000); | ||
Assert.assertNotNull(message); | ||
Assert.assertEquals(i, message.getIntProperty("i")); | ||
Assert.assertEquals(large, message.getBooleanProperty("large")); | ||
if (i % 100 == 0) { | ||
logger.debug("commit {}", i); | ||
session.commit(); | ||
|
||
Wait.assertEquals(numberOfMessages - i - 1, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000); | ||
Wait.assertEquals(numberOfMessages - i - 1, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000); | ||
Wait.assertEquals(numberOfMessages - i - 1, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName), 5000); | ||
} | ||
} | ||
session.commit(); | ||
} | ||
Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName)); | ||
Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName)); | ||
Wait.assertEquals(0, () -> simpleManagementDC3A.getMessageCountOnQueue(queueName)); | ||
} | ||
} |