From 24da42d7f5d465f17f3be956d3afe632b4749a31 Mon Sep 17 00:00:00 2001 From: Nitin Goyal Date: Wed, 4 Jan 2023 11:28:49 +0530 Subject: [PATCH 1/4] Max Retry per msg feature added Signed-off-by: Nitin Goyal --- pulsar/consumer_impl.go | 9 ++++++++- pulsar/retry_router.go | 2 ++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index e0120ad53e..5c34b93081 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -561,7 +561,14 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert msgID: msgID, }, } - if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries { + + maxDeliveries := c.dlq.policy.MaxDeliveries + if s, ok := props[MsgPropertyMaxReconsumeTimes]; ok { + md, _ := strconv.Atoi(s) + maxDeliveries = uint32(md) + } + + if uint32(reconsumeTimes) > maxDeliveries { c.dlq.Chan() <- consumerMsg } else { c.rlq.Chan() <- RetryMessage{ diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go index 7b5f6b8900..38a18e6c20 100644 --- a/pulsar/retry_router.go +++ b/pulsar/retry_router.go @@ -30,6 +30,8 @@ const ( RetryTopicSuffix = "-RETRY" MaxReconsumeTimes = 16 + MsgPropertyMaxReconsumeTimes = "MAX_RECONSUME_TIMES" + SysPropertyDelayTime = "DELAY_TIME" SysPropertyRealTopic = "REAL_TOPIC" SysPropertyRetryTopic = "RETRY_TOPIC" From 4040df84342500be7020b7d47bfdedb9a3b52b11 Mon Sep 17 00:00:00 2001 From: Nitin Goyal Date: Wed, 4 Jan 2023 11:50:59 +0530 Subject: [PATCH 2/4] Test cases added for custom retry per msg Signed-off-by: Nitin Goyal --- pulsar/consumer_test.go | 102 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 11e72a7240..e133e3671b 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1839,6 +1839,108 @@ func TestRLQWithCustomProperties(t *testing.T) { assert.Nil(t, checkMsg) } +func TestRLQWithCustomRetryPerMsg(t *testing.T) { + topic := newTopicName() + testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions" + makeHTTPCall(t, http.MethodPut, testURL, "3") + + subName := fmt.Sprintf("sub01-%d", time.Now().Unix()) + maxRedeliveries := 100 + N := 100 + ctx := context.Background() + + client, err := NewClient(ClientOptions{URL: lookupURL}) + assert.Nil(t, err) + defer client.Close() + + // 1. Pre-produce N messages + producer, err := client.CreateProducer(ProducerOptions{Topic: topic}) + assert.Nil(t, err) + defer producer.Close() + + for i := 0; i < N; i++ { + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("MESSAGE_%d", i)), + Properties: map[string]string{ + MsgPropertyMaxReconsumeTimes: string(i % maxRedeliveries), + }, + }) + assert.Nil(t, err) + } + + // 2. Create consumer on the Retry Topic to reconsume N messages + rlqConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + DLQ: &DLQPolicy{ + MaxDeliveries: uint32(maxRedeliveries), + }, + RetryEnable: true, + NackRedeliveryDelay: 1 * time.Second, + }) + assert.Nil(t, err) + defer rlqConsumer.Close() + + rlqReceived := 0 + for rlqReceived < (N/maxRedeliveries)*(maxRedeliveries*(maxRedeliveries+1)/2) { + msg, err := rlqConsumer.Receive(ctx) + assert.Nil(t, err) + rlqConsumer.ReconsumeLater(msg, 1*time.Second) + rlqReceived++ + } + fmt.Println("retry consumed:", rlqReceived) // 4950 + + // No more messages on the Retry Topic + rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer rlqCancel() + msg, err := rlqConsumer.Receive(rlqCtx) + assert.Error(t, err) + assert.Nil(t, msg) + + // 3. Create consumer on the DLQ topic to verify the routing + dlqConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: "persistent://public/default/" + topic + "-" + subName + "-DLQ", + SubscriptionName: subName, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer dlqConsumer.Close() + + dlqReceived := 0 + for dlqReceived < N { + msg, err := dlqConsumer.Receive(ctx) + assert.Nil(t, err) + dlqConsumer.Ack(msg) + dlqReceived++ + } + fmt.Println("dlq received:", dlqReceived) // 100 + + // No more messages on the DLQ Topic + dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer dlqCancel() + msg, err = dlqConsumer.Receive(dlqCtx) + assert.Error(t, err) + assert.Nil(t, msg) + + // 4. No more messages for same subscription + checkConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subName, + Type: Shared, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + defer checkConsumer.Close() + + checkCtx, checkCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer checkCancel() + checkMsg, err := checkConsumer.Receive(checkCtx) + assert.Error(t, err) + assert.Nil(t, checkMsg) +} + func TestAckWithResponse(t *testing.T) { now := time.Now().Unix() topic01 := fmt.Sprintf("persistent://public/default/topic-%d-01", now) From 65a9d20ffdd80158bde4d9afe7c3a55ed2f40f1a Mon Sep 17 00:00:00 2001 From: Nitin Goyal Date: Wed, 4 Jan 2023 11:56:21 +0530 Subject: [PATCH 3/4] ci fix Signed-off-by: Nitin Goyal --- pulsar/consumer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index e133e3671b..1cd9b24437 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1862,7 +1862,7 @@ func TestRLQWithCustomRetryPerMsg(t *testing.T) { _, err = producer.Send(ctx, &ProducerMessage{ Payload: []byte(fmt.Sprintf("MESSAGE_%d", i)), Properties: map[string]string{ - MsgPropertyMaxReconsumeTimes: string(i % maxRedeliveries), + MsgPropertyMaxReconsumeTimes: fmt.Sprintf("%d", i%maxRedeliveries), }, }) assert.Nil(t, err) From 79936ec0062591e56498e2597af8aabf3875f8f3 Mon Sep 17 00:00:00 2001 From: Nitin Goyal Date: Wed, 4 Jan 2023 12:12:49 +0530 Subject: [PATCH 4/4] ci fix Signed-off-by: Nitin Goyal --- pulsar/consumer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 1cd9b24437..f2e3d2a4f7 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1890,7 +1890,7 @@ func TestRLQWithCustomRetryPerMsg(t *testing.T) { rlqConsumer.ReconsumeLater(msg, 1*time.Second) rlqReceived++ } - fmt.Println("retry consumed:", rlqReceived) // 4950 + fmt.Println("retry consumed:", rlqReceived) // 5050 // No more messages on the Retry Topic rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)