diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index ced770e996..853e36359c 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -186,12 +186,34 @@ func (c *regexConsumer) Ack(msg Message) error { return c.AckID(msg.ID()) } -func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) { - c.log.Warnf("regexp consumer not support ReconsumeLater yet.") +func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) { + c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay) } -func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ map[string]string, _ time.Duration) { - c.log.Warnf("regexp consumer not support ReconsumeLaterWithCustomProperties yet.") +func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string, + delay time.Duration) { + names, err := validateTopicNames(msg.Topic()) + if err != nil { + c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(), err) + return + } + if len(names) != 1 { + c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(), names) + return + } + + tn := names[0] + fqdnTopic := internal.TopicNameWithoutPartitionPart(tn) + consumer, ok := c.consumers[fqdnTopic] + if !ok { + // check to see if the topic with the partition part is in the consumers + // this can happen when the consumer is configured to consume from a specific partition + if consumer, ok = c.consumers[tn.Name]; !ok { + c.log.Warnf("consumer of topic %s not exist unexpectedly", msg.Topic()) + return + } + } + consumer.ReconsumeLaterWithCustomProperties(msg, customProperties, delay) } // AckID the consumption of a single message, identified by its MessageID @@ -454,6 +476,11 @@ func (c *regexConsumer) topics() ([]string, error) { } filtered := filterTopics(topics, c.pattern) + + if c.options.RetryEnable { + filtered = append(filtered, c.options.DLQ.RetryLetterTopic) + } + return filtered, nil } diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index e3bf9bfb81..688ec64867 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -2121,6 +2121,121 @@ func TestRLQMultiTopics(t *testing.T) { assert.Nil(t, checkMsg) } +func TestRLQRegex(t *testing.T) { + now := time.Now().Unix() + topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now) + topic02 := fmt.Sprintf("topic-%d-2", now) + topicPattern := fmt.Sprintf("topic-%d-.", now) + + subName := fmt.Sprintf("sub01-%d", time.Now().Unix()) + maxRedeliveries := 2 + N := 100 + ctx := context.Background() + + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() + + rlqTopic := fmt.Sprintf("persistent://public/default/rlq-topic-%d-1", now) + dlqTopic := fmt.Sprintf("persistent://public/default/dlq-topic-%d-1", now) + + // subscribe regex topics with Retry Topics + rlqConsumer, err := client.Subscribe(ConsumerOptions{ + TopicsPattern: topicPattern, + SubscriptionName: subName, + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + DLQ: &DLQPolicy{ + MaxDeliveries: uint32(maxRedeliveries), + RetryLetterTopic: rlqTopic, + DeadLetterTopic: dlqTopic, + }, + RetryEnable: true, + NackRedeliveryDelay: 1 * time.Second, + }) + assert.Nil(t, err) + defer rlqConsumer.Close() + + // subscribe DLQ Topic + dlqConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: dlqTopic, + SubscriptionName: subName, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + AutoDiscoveryPeriod: 10 * time.Second, + }) + assert.Nil(t, err) + defer dlqConsumer.Close() + + // create multi producers + producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01}) + assert.Nil(t, err) + defer producer01.Close() + + producer02, err := client.CreateProducer(ProducerOptions{Topic: topic02}) + assert.Nil(t, err) + defer producer02.Close() + + // 1. Pre-produce N messages for every topic + for i := 0; i < N; i++ { + _, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))}) + assert.Nil(t, err) + _, err = producer02.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_02_%d", i))}) + assert.Nil(t, err) + } + + // 2. Create consumer on the Retry Topics to reconsume 2*N messages (maxRedeliveries+1) times + rlqReceived := 0 + for rlqReceived < 2*N*(maxRedeliveries+1) { + msg, err := rlqConsumer.Receive(ctx) + assert.Nil(t, err) + rlqConsumer.ReconsumeLater(msg, 1*time.Second) + rlqReceived++ + } + assert.Equal(t, rlqReceived, 2*N*(maxRedeliveries+1)) + fmt.Println("retry consumed:", rlqReceived, 2*N*(maxRedeliveries+1)) // 600 + + // No more messages on the Retry Topic + rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer rlqCancel() + msg, err := rlqConsumer.Receive(rlqCtx) + assert.Error(t, err) + assert.Nil(t, msg) + + // 3. Create consumer on the DLQ topic to verify the routing + dlqReceived := 0 + for dlqReceived < 2*N { + msg, err := dlqConsumer.Receive(ctx) + assert.Nil(t, err) + dlqConsumer.Ack(msg) + dlqReceived++ + } + fmt.Println("dlq received:", dlqReceived) // 200 + assert.Equal(t, dlqReceived, 2*N) + + // No more messages on the DLQ Topic + dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer dlqCancel() + msg, err = dlqConsumer.Receive(dlqCtx) + assert.Error(t, err) + assert.Nil(t, msg) + + // 4. No more messages for same subscription + checkConsumer, err := client.Subscribe(ConsumerOptions{ + TopicsPattern: topicPattern, + SubscriptionName: subName, + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer checkConsumer.Close() + + timeoutCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + checkMsg, err := checkConsumer.Receive(timeoutCtx) + assert.Error(t, err) + assert.Nil(t, checkMsg) +} + func TestRLQSpecifiedPartitionTopic(t *testing.T) { topic := newTopicName() testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"