Skip to content

Commit

Permalink
ARTEMIS-4733 Infinite mirror reflections after CreateAddress
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Apr 20, 2024
1 parent ed59b0e commit fdf2ea8
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public long getMessageCountOnQueue(String queueName) throws Exception {
return simpleManagementLong(ResourceNames.QUEUE + queueName, "getMessageCount");
}

public long getMessageAddedOnQueue(String queueName) throws Exception {
return simpleManagementLong(ResourceNames.QUEUE + queueName, "getMessagesAdded");
}

public int getDeliveringCountOnQueue(String queueName) throws Exception {
return simpleManagementInt(ResourceNames.QUEUE + queueName, "getDeliveringCount");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,10 +542,6 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr
server.callBrokerAddressPlugins(plugin -> plugin.beforeAddAddress(addressInfo, reload));
}

if (!reload && mirrorControllerSource != null) {
mirrorControllerSource.addAddress(addressInfo);
}

boolean result;
if (reload) {
result = addressManager.reloadAddressInfo(addressInfo);
Expand All @@ -554,6 +550,10 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr
}
// only register address if it is new
if (result) {
if (!reload && mirrorControllerSource != null) {
mirrorControllerSource.addAddress(addressInfo);
}

try {
managementService.registerAddress(addressInfo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
Expand All @@ -49,8 +53,6 @@
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,18 +82,17 @@ public class ClusteredMirrorSoakTest extends SoakTestBase {
Process processDC2_node_A;
Process processDC2_node_B;


private static String DC1_NODEA_URI = "tcp://localhost:61616";
private static String DC1_NODEB_URI = "tcp://localhost:61617";
private static String DC2_NODEA_URI = "tcp://localhost:61618";
private static String DC2_NODEB_URI = "tcp://localhost:61619";

private static void createServer(String serverName, String connectionName, String clusterURI, String mirrorURI, int porOffset) throws Exception {
private static void createServer(String serverName, String connectionName, String clusterURI, String mirrorURI, int porOffset, boolean paging) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);

HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setClustered(true);
cliCreateServer.setNoWeb(true);
Expand All @@ -115,24 +116,16 @@ private static void createServer(String serverName, String connectionName, Strin
Assert.assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by ClusteredMirrorSoakTest.java --> \n"));
if (paging) {
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-size-messages>-1</max-size-messages>", "<max-size-messages>1</max-size-messages>"));
}
}


@Before
public void cleanupServers() {
cleanupData(DC1_NODE_A);
cleanupData(DC1_NODE_B);
cleanupData(DC2_NODE_A);
cleanupData(DC2_NODE_B);
}


@BeforeClass
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0);
createServer(DC1_NODE_B, "mirror", DC1_NODEA_URI, DC2_NODEB_URI, 1);
createServer(DC2_NODE_A, "mirror", DC2_NODEB_URI, DC1_NODEA_URI, 2);
createServer(DC2_NODE_B, "mirror", DC2_NODEA_URI, DC1_NODEB_URI, 3);
public static void createRealServers(boolean paging) throws Exception {
createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0, paging);
createServer(DC1_NODE_B, "mirror", DC1_NODEA_URI, DC2_NODEB_URI, 1, paging);
createServer(DC2_NODE_A, "mirror", DC2_NODEB_URI, DC1_NODEA_URI, 2, paging);
createServer(DC2_NODE_B, "mirror", DC2_NODEA_URI, DC1_NODEB_URI, 3, paging);
}

private void startServers() throws Exception {
Expand All @@ -147,8 +140,147 @@ private void startServers() throws Exception {
ServerUtil.waitForServerToStart(3, 10_000);
}

@Test
public void testAvoidReflections() throws Exception {
createRealServers(true);

String internalQueue = "INTERNAL_QUEUE";

ActiveMQServer tempServer = createServer(true);
tempServer.getConfiguration().setBindingsDirectory(getServerLocation(DC1_NODE_A) + "/data/bindings");
tempServer.getConfiguration().setJournalDirectory(getServerLocation(DC1_NODE_A) + "/data/journal");
tempServer.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
tempServer.start();
tempServer.addAddressInfo(new AddressInfo(internalQueue).addRoutingType(RoutingType.ANYCAST).setInternal(true));
tempServer.createQueue(new QueueConfiguration(internalQueue).setDurable(true).setRoutingType(RoutingType.ANYCAST).setInternal(true).setAddress(internalQueue));
tempServer.stop();

startServers();

SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2A = new SimpleManagement(DC2_NODEA_URI, null, null);
SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEA_URI, null, null);
SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null);

String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";

String queueName = "myQueue";
String topicName = "order";

for (int i = 0; i < 5; i++) {
logger.info("DC1A={}", simpleManagementDC1A.getMessageAddedOnQueue(snfQueue));
logger.info("DC1B={}", simpleManagementDC1B.getMessageAddedOnQueue(snfQueue));
logger.info("DC2A={}", simpleManagementDC2A.getMessageAddedOnQueue(snfQueue));
logger.info("DC2B={}", simpleManagementDC2B.getMessageAddedOnQueue(snfQueue));

// no load generated.. just initial queues should have been sent
Assert.assertTrue(simpleManagementDC1A.getMessageAddedOnQueue(snfQueue) < 20);
Assert.assertTrue(simpleManagementDC2A.getMessageAddedOnQueue(snfQueue) < 20);
Assert.assertTrue(simpleManagementDC1B.getMessageAddedOnQueue(snfQueue) < 20);
Assert.assertTrue(simpleManagementDC2B.getMessageAddedOnQueue(snfQueue) < 20);
Thread.sleep(100);
}

Assert.assertEquals(0, simpleManagementDC2A.getMessageCountOnQueue(queueName));
Assert.assertEquals(0, simpleManagementDC1A.getMessageCountOnQueue(internalQueue));
try {
simpleManagementDC2A.getMessageCountOnQueue(internalQueue);
Assert.fail("Exception expected");
} catch (Exception expected) {
}

ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI);

int numberOfMessages = 1_000;

try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.setClientID("conn1");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(topicName);
MessageConsumer con = session.createDurableConsumer(topic, "hello1");
MessageConsumer con2 = session.createDurableConsumer(topic, "hello2");

MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < numberOfMessages; i++) {
if (i % 100 == 0) {
logger.info("Sent topic {}", i);
}
producer.send(session.createTextMessage("hello " + i));
}
session.commit();

}

try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);

MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < numberOfMessages; i++) {
if (i % 100 == 0) {
logger.info("Sent queue {}", i);
}
producer.send(session.createTextMessage("hello " + i));
}
session.commit();
}

Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(numberOfMessages, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);

try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.setClientID("conn1");
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(topicName);
Queue queue = session.createQueue(queueName);
MessageConsumer[] consumers = new MessageConsumer[] {session.createDurableSubscriber(topic, "hello1"), session.createDurableSubscriber(topic, "hello2"), session.createConsumer(queue)};

for (MessageConsumer c : consumers) {
for (int i = 0; i < numberOfMessages; i++) {
Assert.assertNotNull(c.receive(5000));
if (i % 100 == 0) {
session.commit();
}
}
session.commit();
}
}

Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue(queueName), 5000);
Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(0, () -> simpleManagementDC1A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);
Wait.assertEquals(0, () -> simpleManagementDC2A.getMessageCountOnQueue("conn1.hello2"), 5000);

long countDC1A = simpleManagementDC1A.getMessageAddedOnQueue(snfQueue);
long countDC1B = simpleManagementDC1B.getMessageAddedOnQueue(snfQueue);

for (int i = 0; i < 10; i++) {
// DC1 should be quiet and nothing moving out of it
Assert.assertEquals(countDC1A, simpleManagementDC1A.getMessageAddedOnQueue(snfQueue));
Assert.assertEquals(countDC1B, simpleManagementDC1B.getMessageAddedOnQueue(snfQueue));

// DC2 is totally passive, nothing should have been generated
Assert.assertTrue(simpleManagementDC2A.getMessageAddedOnQueue(snfQueue) < 20);
Assert.assertTrue(simpleManagementDC2B.getMessageAddedOnQueue(snfQueue) < 20);
// we take intervals, allowing to make sure it doesn't grow
Thread.sleep(100);
logger.info("DC1A={}", simpleManagementDC1A.getMessageAddedOnQueue(snfQueue));
logger.info("DC1B={}", simpleManagementDC1B.getMessageAddedOnQueue(snfQueue));
logger.info("DC2A={}", simpleManagementDC2A.getMessageAddedOnQueue(snfQueue));
logger.info("DC2B={}", simpleManagementDC2B.getMessageAddedOnQueue(snfQueue));
}
}

@Test
public void testSimpleQueue() throws Exception {
createRealServers(false);
startServers();

final int numberOfMessages = 200;
Expand Down Expand Up @@ -303,6 +435,7 @@ public void testAutoCreateQueue() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
runAfter(executorService::shutdownNow);

createRealServers(false);
startServers();

String queueName = "testqueue" + RandomUtil.randomString();
Expand Down Expand Up @@ -370,6 +503,7 @@ public void testAutoCreateQueue() throws Exception {

@Test
public void testMirroredTopics() throws Exception {
createRealServers(false);
startServers();

final int numberOfMessages = 200;
Expand All @@ -389,8 +523,8 @@ public void testMirroredTopics() throws Exception {
SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEB_URI, null, null);
SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null);

consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false);
consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, 0, false);
consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false);
consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, 0, false);

try (Connection connection = connectionFactoryDC1B.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Expand All @@ -414,7 +548,7 @@ public void testMirroredTopics() throws Exception {
}

logger.debug("Consuming from DC1B");
consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, numberOfMessages / 2, false);
consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, numberOfMessages / 2, false);

processDC2_node_B.destroyForcibly();
processDC2_node_B.waitFor();
Expand All @@ -425,12 +559,12 @@ public void testMirroredTopics() throws Exception {

logger.debug("Consuming from DC2B with {}", simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order"));

consume(connectionFactoryDC2B, clientIDB, subscriptionID, numberOfMessages / 2, numberOfMessages / 2, true);
consume(connectionFactoryDC2B, clientIDB, subscriptionID, numberOfMessages / 2, numberOfMessages / 2, true);

Wait.assertEquals(0, () -> simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order"), 10000);

Wait.assertEquals(0, () -> simpleManagementDC1B.getMessageCountOnQueue("nodeB.my-order"), 10000);
consume(connectionFactoryDC1B, clientIDB, subscriptionID, numberOfMessages, 0, true);
consume(connectionFactoryDC1B, clientIDB, subscriptionID, numberOfMessages, 0, true);
logger.debug("DC1B nodeB.my-order=0");
}

Expand Down

0 comments on commit fdf2ea8

Please sign in to comment.