diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index e3bd6feefb69..1c3f4789ce28 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -470,6 +470,11 @@ private Binding getNextBinding(final Message message, // if no bindings were found, we will apply a secondary level on the routing logic if (lastLowPriorityBinding != -1) { nextBinding = bindings[lastLowPriorityBinding]; + if (nextBinding != null && loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) { + //return before changing bindingIndex, otherwise every incoming message sets the index to the same position. + //bindingIndex is shared with the redistributor + return nextBinding; + } nextPosition = moveNextPosition(lastLowPriorityBinding, bindingsCount); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index 3f8c8257986d..c297a90dfe30 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -1011,6 +1011,40 @@ public void testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistributio } + @Test + public void testEvenRedistributionLbOffWithRedistribution() throws Exception { + final int messageCount = 1000; + final String queue = "queues.test"; + + setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION); + startServers(0, 1, 2); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + + createQueue(0, queue, queue, null, false, RoutingType.ANYCAST); + createQueue(1, queue, queue, null, false, RoutingType.ANYCAST); + createQueue(2, queue, queue, null, false, RoutingType.ANYCAST); + + addConsumer(0, 1, queue, null); + addConsumer(1, 2, queue, null); + + waitForBindings(0, queue, 1, 0, true); + waitForBindings(1, queue, 1, 1, true); + waitForBindings(2, queue, 1, 1, true); + + waitForBindings(0, queue, 2, 2, false); + waitForBindings(1, queue, 2, 1, false); + waitForBindings(2, queue, 2, 1, false); + + send(0, queue, messageCount * 2, false, null); + + Wait.assertEquals(0L, () -> servers[0].getTotalMessageCount(), 5000, 100); + Assert.assertEquals(messageCount, servers[1].getTotalMessageCount()); + Assert.assertEquals(messageCount, servers[2].getTotalMessageCount()); + } + @Test public void testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws Exception {