From 096a869c1ed6102a43074434db07899c70231c4a Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Thu, 25 Jul 2024 22:50:14 +0800 Subject: [PATCH] ARTEMIS-4954 AddressControl.pause() can pause the snf queue --- .../artemis/core/server/impl/AddressInfo.java | 14 +-- .../bridge/ClusteredBridgeReconnectTest.java | 107 ++++++++++++++++++ 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java index 44942f5e9a9..e35a30828c5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.server.impl; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.json.JsonArray; import org.apache.activemq.artemis.json.JsonArrayBuilder; import org.apache.activemq.artemis.json.JsonNumber; @@ -35,7 +36,6 @@ import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.PostOffice; -import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener; import org.apache.activemq.artemis.utils.CompositeAddress; @@ -216,8 +216,8 @@ public synchronized void reloadPause(long recordID) { Bindings bindings = postOffice.lookupBindingsForAddress(this.getName()); if (bindings != null) { for (Binding binding : bindings.getBindings()) { - if (binding instanceof QueueBinding) { - ((QueueBinding) binding).getQueue().pause(false); + if (binding instanceof LocalQueueBinding) { + ((LocalQueueBinding) binding).getQueue().pause(false); } } } @@ -250,8 +250,8 @@ public synchronized void pause(boolean persist) { Bindings bindings = postOffice.lookupBindingsForAddress(this.getName()); if (bindings != null) { for (Binding binding : bindings.getBindings()) { - if (binding instanceof QueueBinding) { - ((QueueBinding) binding).getQueue().pause(false); + if (binding instanceof LocalQueueBinding) { + ((LocalQueueBinding) binding).getQueue().pause(false); } } } @@ -278,8 +278,8 @@ public synchronized void resume() { Bindings bindings = postOffice.lookupBindingsForAddress(this.getName()); if (bindings != null) { for (Binding binding : bindings.getBindings()) { - if (binding instanceof QueueBinding) { - ((QueueBinding) binding).getQueue().resume(); + if (binding instanceof LocalQueueBinding) { + ((LocalQueueBinding) binding).getQueue().resume(); } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java index e2ac8e8aa95..62fecf764db 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java @@ -17,7 +17,9 @@ package org.apache.activemq.artemis.tests.integration.cluster.bridge; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; @@ -28,17 +30,26 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.TopologyMember; +import org.apache.activemq.artemis.api.core.management.AddressControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.Bindings; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; +import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.jupiter.api.AfterEach; @@ -305,6 +316,102 @@ public void testClusterBridgeAddRemoteBinding() throws Exception { stopServers(0, 1); } + @Test + public void testPauseAddressBlockingSnFQueue() throws Exception { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); + + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setRedistributionDelay(0); + + servers[0].getAddressSettingsRepository().addMatch("#", addressSettings); + servers[1].getAddressSettingsRepository().addMatch("#", addressSettings); + + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + ClientSession session0 = sfs[0].createSession(); + ClientSession session1 = sfs[1].createSession(); + + session0.start(); + session1.start(); + + createQueue(0, "queues.testaddress", "queue1", null, true); + createQueue(1, "queues.testaddress", "queue1", null, true); + ClientConsumer consumer1 = session1.createConsumer("queue1"); + + waitForBindings(0, "queues.testaddress", 1, 0, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 1, 1, false); + waitForBindings(1, "queues.testaddress", 1, 0, false); + + final int num = 10; + //normal message flow should work + ClientProducer goodProducer0 = session0.createProducer("queues.testaddress"); + for (int i = 0; i < num; i++) { + Message msg = session0.createMessage(true); + msg.putStringProperty("origin", "from producer 0"); + goodProducer0.send(msg); + } + + //consumer1 can receive from node0 + for (int i = 0; i < num; i++) { + ClientMessage m = consumer1.receive(5000); + assertNotNull(m); + String propValue = m.getStringProperty("origin"); + assertEquals("from producer 0", propValue); + m.acknowledge(); + } + assertNull(consumer1.receiveImmediate()); + + //pause address from node0 + String addressControlResourceName = ResourceNames.ADDRESS + "queues.testaddress"; + Object resource = servers[0].getManagementService().getResource(addressControlResourceName); + AddressControl addressControl0 = (AddressControl) resource; + addressControl0.pause(); + + Bindings bindings0 = servers[0].getPostOffice().getBindingsForAddress(SimpleString.of("queues.testaddress")); + assertNotNull(bindings0); + assertEquals(2, bindings0.getBindings().size()); + boolean localBindingPaused = false; + boolean remoteBindingPaused = true; + for (Binding bd : bindings0.getBindings()) { + if (bd instanceof LocalQueueBinding) { + localBindingPaused = ((LocalQueueBinding)bd).getQueue().isPaused(); + } + if (bd instanceof RemoteQueueBinding) { + remoteBindingPaused = ((RemoteQueueBinding)bd).getQueue().isPaused(); + } + } + assertTrue(localBindingPaused); + assertFalse(remoteBindingPaused); + + //now message should flow to node 1 regardless of the pause + for (int i = 0; i < num; i++) { + Message msg = session0.createMessage(true); + msg.putStringProperty("origin", "from producer 0"); + goodProducer0.send(msg); + } + + //consumer1 can receive from node0 + for (int i = 0; i < num; i++) { + ClientMessage m = consumer1.receive(5000); + assertNotNull(m); + String propValue = m.getStringProperty("origin"); + assertEquals("from producer 0", propValue); + m.acknowledge(); + } + assertNull(consumer1.receiveImmediate()); + + stopServers(0, 1); + } @Override @AfterEach