Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Hash range collision causes out-of-order cases between existing consumers in Key_Shared #23315

Closed
2 of 3 tasks
equanz opened this issue Sep 18, 2024 · 13 comments · Fixed by #23327
Closed
2 of 3 tasks
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@equanz
Copy link
Contributor

equanz commented Sep 18, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

Tested with https://github.com/apache/pulsar/tree/4f96146f13b136644a4eb0cf4ec36699e0431929 .

Minimal reproduce step

Apply the following patches and run the test.

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index 48311c5733..685baeef9d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -29,11 +29,13 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
 import org.apache.pulsar.client.api.Range;
 import org.testng.Assert;
 import org.testng.annotations.Test;

+@Slf4j
 @Test(groups = "broker")
 public class ConsistentHashingStickyKeyConsumerSelectorTest {

@@ -216,4 +218,35 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest {
         // then there should be no mapping remaining
         Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), 0);
     }
+
+    @Test
+    public void testModifyMappingBetweenExistingConsumers() {
+        final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
+        final String consumerName = "c1";
+        final int numOfInitialConsumers = 3;
+        for (int i = 0; i < numOfInitialConsumers; i++) {
+            final Consumer consumer = mock(Consumer.class);
+            when(consumer.consumerName()).thenReturn(consumerName);
+            when(consumer.consumerId()).thenReturn((long) i);
+            selector.addConsumer(consumer);
+        }
+
+        final int keyHash = numOfInitialConsumers + 1;
+        // get expected consumer
+        final Consumer expectedConsumer = selector.select(keyHash);
+
+        // add new consumer
+        final Consumer newConsumer = mock(Consumer.class);
+        when(newConsumer.consumerName()).thenReturn(consumerName);
+        when(newConsumer.consumerId()).thenReturn((long) numOfInitialConsumers);
+        selector.addConsumer(newConsumer);
+
+        final Consumer actualConsumer = selector.select(keyHash);
+        try {
+            Assert.assertEquals(actualConsumer.consumerId(), expectedConsumer.consumerId());
+        } catch (AssertionError e) {
+            // if it changes (that is normal behavior), expect it to be the new consumer
+            Assert.assertEquals(actualConsumer.consumerId(), newConsumer.consumerId());
+        }
+    }
 }

What did you expect to see?

In auto-split hash mode, we expect that the new consumer takes the hash range from existing consumers.
(The dispatcher addresses the above case by recentlyJoinedConsumers.)

So, the range doesn't move between existing consumers.

What did you see instead?

[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.289 s <<< FAILURE! - in org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelectorTest
[ERROR] org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelectorTest.testModifyMappingBetweenExistingConsumers  Time elapsed: 0.212 s  <<< FAILURE!
java.lang.AssertionError: expected [3] but found [0]
        at org.testng.Assert.fail(Assert.java:110)
        at org.testng.Assert.failNotEquals(Assert.java:1577)
        at org.testng.Assert.assertEqualsImpl(Assert.java:149)
        at org.testng.Assert.assertEquals(Assert.java:131)
        at org.testng.Assert.assertEquals(Assert.java:979)
        at org.testng.Assert.assertEquals(Assert.java:955)
        at org.testng.Assert.assertEquals(Assert.java:989)
        at org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelectorTest.testModifyMappingBetweenExistingConsumers(ConsistentHashingStickyKeyConsumerSelectorTest.java:251)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:569)
        at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
        at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
        at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
        at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

When the hash range collides, the selector stores the consumer in the list of collisions.

} else {
if (!v.contains(consumer)) {
v.add(consumer);
v.sort(Comparator.comparing(Consumer::consumerName, String::compareTo));
}
return v;
}

And, get the consumer by the following calculation.

4 % 3 = 1 , then return the consumer which has consumerId 1
(add new consumer which has consumerId 3)
4 % 4 = 0 , then return the consumer which has consumerId 0
Consumers with consumerId of 0 and 1 are existing consumers. So, the range moves between existing consumers.

The above case leads to out-of-order redelivery.
Shouldn't we care about this?

Anything else?

For ease, I use the same name as the consumer in this example. However, this issue is caused not only by consumers of the same name but also by coincidence hash collisions.

(This issue was originally reported by @hrsakai .)

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@equanz equanz added the type/bug The PR fixed a bug or issue reported a bug label Sep 18, 2024
@lhotari
Copy link
Member

lhotari commented Sep 18, 2024

@equanz @hrsakai Thanks for reporting. Coincidentally I was investigating this today and had this concern but didn't validate it.

@lhotari
Copy link
Member

lhotari commented Sep 19, 2024

Just guessing, but perhaps the intention of this code is to select a different client for each partition when there are multiple consumers with the same name:

@lhotari
Copy link
Member

lhotari commented Sep 19, 2024

Slightly related issue about the incorrect results of getConsumerKeyHashRanges: #23321

@codelipenghui
Copy link
Contributor

The above case leads to out-of-order redelivery.

I don't think it will cause out-of-order issue.

  • The ordering is not guaranteed by Hash Selector, is was guaranteed by the PersistentStickyKeyDispatcherMultipleConsumers
  • The newly joined consumer will only get messages after the old consumers have acknowledged all the outstanding messages

Get a different consumer for the same hash is expected since the consumers are changed.

@lhotari
Copy link
Member

lhotari commented Sep 19, 2024

I don't think it will cause out-of-order issue.

  • The ordering is not guaranteed by Hash Selector, is was guaranteed by the PersistentStickyKeyDispatcherMultipleConsumers
  • The newly joined consumer will only get messages after the old consumers have acknowledged all the outstanding messages

Get a different consumer for the same hash is expected since the consumers are changed.

The result of this bug is that the target consumer will switch also for existing consumers in certain cases. @codelipenghui Did you consider that case?

@lhotari
Copy link
Member

lhotari commented Sep 20, 2024

I'm pretty sure that consumerList.get(hash % consumerList.size()) is wrong. The fix is explained in #23321 (comment) . I'll submit a PR.

@lhotari lhotari self-assigned this Sep 20, 2024
@codelipenghui
Copy link
Contributor

The result of this bug is that the target consumer will switch also for existing consumers in certain cases. @codelipenghui Did you consider that case?

@lhotari Oh, I got your point for now. One consumer joined will cause the key assignment change for many other consumers. Thanks for the explanation.

@lhotari
Copy link
Member

lhotari commented Sep 20, 2024

It looks like #8396 wasn't a correct solution at the time it was made. @codelipenghui I think that we need to address this for all maintenance branches.

@lhotari
Copy link
Member

lhotari commented Sep 20, 2024

It looks like #8396 wasn't a correct solution at the time it was made. @codelipenghui I think that we need to address this for all maintenance branches.

I have created #23327 to fix the issue. Please review

@lhotari
Copy link
Member

lhotari commented Sep 20, 2024

#23327 is now ready for review after multiple iterations.

@equanz
Copy link
Contributor Author

equanz commented Sep 23, 2024

Just guessing, but perhaps the intention of this code is to select a different client for each partition when there are multiple consumers with the same name:
#23315 (comment)

The list is sorted by consumerName.

} else {
if (!v.contains(consumer)) {
v.add(consumer);
v.sort(Comparator.comparing(Consumer::consumerName, String::compareTo));
}
return v;
}

The behavior when the keys to be compared are the same seems to be undefined.
(I haven't rechecked it, but I think, in my environment, it was in order of addition.)
https://docs.oracle.com/javase/8/docs/api/java/util/ArrayList.html#sort-java.util.Comparator-

So, as you say, each partition's selector could be different.
However, if the producer uses a partition key in RoundRobin or SinglePartition routing mode, specific key messages are sent to a specific partition. If this is correct, it would be no issue on the key order guarantee.
cf. https://pulsar.apache.org/docs/3.3.x/concepts-messaging/#ordering-guarantee

@equanz
Copy link
Contributor Author

equanz commented Sep 23, 2024

I don't think it will cause out-of-order issue.
#23315 (comment)

The result of this bug is that the target consumer will switch also for existing consumers in certain cases.
#23315 (comment)

One consumer joined will cause the key assignment change for many other consumers.
#23315 (comment)

(Thank you @lhotari !)
That is correct. My concern is "the range moves between existing consumers". In my understanding, this case is not currently addressed by the dispatcher.

@lhotari
Copy link
Member

lhotari commented Sep 24, 2024

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Development

Successfully merging a pull request may close this issue.

3 participants