From f38ea1120c31c6587d3ae821de7e5139de2e2684 Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Fri, 18 Aug 2023 22:54:44 +0800 Subject: [PATCH 1/2] [fix][broker] Fix incorrect unack msk count when dup ack a message (#20990) (cherry picked from commit 4facdad9c7e8f6558dfb71b230aa8260e57b559e) --- .../pulsar/broker/service/Consumer.java | 24 +++++++++++------ .../broker/service/BrokerServiceTest.java | 26 +++++++++++++++++++ 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 8924b750eb624..a7c06d0c85d63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -444,6 +444,7 @@ public CompletableFuture messageAcked(CommandAck ack) { private CompletableFuture individualAckNormal(CommandAck ack, Map properties) { List positionsAcked = new ArrayList<>(); long totalAckCount = 0; + boolean individualAck = false; for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); PositionImpl position; @@ -467,14 +468,18 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + @Cleanup + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub-1") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .isAckReceiptEnabled(true) + .subscribe(); + producer.send("1".getBytes(StandardCharsets.UTF_8)); + Message message = consumer1.receive(); + consumer1.acknowledge(message); + consumer1.acknowledge(message); + assertEquals(admin.topics().getStats(topicName).getSubscriptions() + .get("sub-1").getUnackedMessages(), 0); + } } From 05701d76e0b13eaff73660ba7c7c2580eb9c820a Mon Sep 17 00:00:00 2001 From: labuladong Date: Wed, 7 Dec 2022 14:48:44 +0800 Subject: [PATCH 2/2] [fix][test] flaky test `testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction` (#18726) (cherry picked from commit 2d205c93cb9e1324834e0818de0edc531b66a119) --- .../impl/KeySharedSubscriptionTest.java | 136 ++++++++---------- 1 file changed, 56 insertions(+), 80 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java index 213296d22833a..d2288f948b830 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java @@ -18,32 +18,35 @@ */ package org.apache.pulsar.client.impl; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import lombok.Cleanup; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.Murmur3_32Hash; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; @Test(groups = "broker-impl") public class KeySharedSubscriptionTest extends ProducerConsumerBase { @@ -70,91 +73,58 @@ public Object[][] subType() { @Test(dataProvider = "subType") public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(SubscriptionType subscriptionType) throws PulsarClientException { - PulsarClient pulsarClient = PulsarClient.builder(). - serviceUrl(lookupUrl.toString()) - .build(); final int totalMsg = 1000; String topic = "broker-close-test-" + RandomStringUtils.randomAlphabetic(5); - Map, List> nameToId = Maps.newConcurrentMap(); + Map, List> nameToId = new ConcurrentHashMap<>(); Set pubMessages = Sets.newConcurrentHashSet(); Set recMessages = Sets.newConcurrentHashSet(); AtomicLong lastActiveTime = new AtomicLong(); AtomicBoolean canAcknowledgement = new AtomicBoolean(false); - @Cleanup - Consumer consumer1 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .consumerName("con-1") - .messageListener((cons1, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons1.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - }) - .subscribe(); - @Cleanup - Consumer consumer2 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .messageListener((cons2, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons2.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); + List> consumerList = new ArrayList<>(); + // create 3 consumers + for (int i = 0; i < 3; i++) { + ConsumerBuilder builder = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub-1") + .subscriptionType(subscriptionType) + .messageListener((consumer, msg) -> { + lastActiveTime.set(System.currentTimeMillis()); + nameToId.computeIfAbsent(consumer, (k) -> new ArrayList<>()) + .add(msg.getMessageId()); + recMessages.add(msg.getMessageId()); + if (canAcknowledgement.get()) { + try { + consumer.acknowledge(msg); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } } - } - }) - .consumerName("con-2") - .subscribe(); - @Cleanup - Consumer consumer3 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .messageListener((cons3, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons3.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - }) - .consumerName("con-3") - .subscribe(); + }); + + if (subscriptionType == SubscriptionType.Key_Shared) { + // ensure every consumer can be distributed messages + int hash = Murmur3_32Hash.getInstance().makeHash(("key-" + i).getBytes()) + % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE; + builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, hash))); + } + + consumerList.add(builder.subscribe()); + } - @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) .enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) // We chose 9 because the maximum unacked message is 10 .batchingMaxMessages(9) + .batcherBuilder(BatcherBuilder.KEY_BASED) .create(); for (int i = 0; i < totalMsg; i++) { - producer.sendAsync(UUID.randomUUID().toString() - .getBytes(StandardCharsets.UTF_8)) - .thenAccept(pubMessages::add); + byte[] msg = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + producer.newMessage().key("key-" + (i % 3)).value(msg) + .sendAsync().thenAccept(pubMessages::add); } // Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages. @@ -176,7 +146,7 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc // Wait for all consumers to continue receiving messages. Awaitility.await() - .atMost(30, TimeUnit.SECONDS) + .atMost(15, TimeUnit.SECONDS) .pollDelay(5, TimeUnit.SECONDS) .until(() -> (System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5)); @@ -186,5 +156,11 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc Assert.assertEquals(pubMessages.size(), totalMsg); Assert.assertEquals(pubMessages.size(), recMessages.size()); Assert.assertTrue(recMessages.containsAll(pubMessages)); + + // cleanup + producer.close(); + for (Consumer consumer : consumerList) { + consumer.close(); + } } }