From 4400dcdc9a557ad84cc13ee8588701731bcbf9a0 Mon Sep 17 00:00:00 2001 From: kenny Date: Fri, 30 Aug 2024 18:02:08 +0800 Subject: [PATCH] feat: introduce ack trackers --- pulsar/consumer_partition.go | 40 +++++++++++++++++++ pulsar/consumer_partition_test.go | 64 +++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index d8001dc122..931e25b26d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -178,6 +178,7 @@ type partitionConsumer struct { chunkedMsgCtxMap *chunkedMsgCtxMap unAckChunksTracker *unAckChunksTracker ackGroupingTracker ackGroupingTracker + ackTrackers *ackTrackers lastMessageInBroker *trackingMessageID @@ -375,6 +376,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon pc.decryptor = decryptor pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log) + pc.ackTrackers = newAckTrackers() err := pc.grabConn("") if err != nil { @@ -443,6 +445,9 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn } trackingID := toTrackingMessageID(msgID) + if trackingID != nil && trackingID.tracker == nil { + trackingID.tracker = pc.ackTrackers.tracker(trackingID) + } if trackingID != nil && trackingID.ack() { // All messages in the same batch have been acknowledged, we only need to acknowledge the @@ -453,6 +458,7 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn entryID: trackingID.entryID, }, } + pc.ackTrackers.remove(trackingID) pc.metrics.AcksCounter.Inc() pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9) } else if !pc.options.enableBatchIndexAck { @@ -712,6 +718,9 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon if trackingID == nil { return errors.New("failed to convert trackingMessageID") } + if trackingID.tracker == nil { + trackingID.tracker = pc.ackTrackers.tracker(trackingID) + } var msgIDToAck *trackingMessageID if trackingID.ackCumulative() || pc.options.enableBatchIndexAck { @@ -725,6 +734,7 @@ func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withRespon return nil } + pc.ackTrackers.remove(msgIDToAck) pc.metrics.AcksCounter.Inc() pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-trackingID.receivedTime.UnixNano()) / 1.0e9) @@ -1162,6 +1172,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header ackTracker) // set the consumer so we know how to ack the message id trackingMsgID.consumer = pc + pc.ackTrackers.add(trackingMsgID, ackTracker) if pc.messageShouldBeDiscarded(trackingMsgID) { pc.AckID(trackingMsgID) @@ -2366,3 +2377,32 @@ func (u *unAckChunksTracker) nack(cmid *chunkMessageID) { } u.remove(cmid) } + +type ackTrackers struct { + mu sync.RWMutex + trackers map[[2]int64]*ackTracker +} + +func newAckTrackers() *ackTrackers { + return &ackTrackers{ + trackers: make(map[[2]int64]*ackTracker), + } +} + +func (a *ackTrackers) tracker(id MessageID) *ackTracker { + a.mu.RLock() + defer a.mu.RUnlock() + return a.trackers[[2]int64{id.LedgerID(), id.EntryID()}] +} + +func (a *ackTrackers) add(id MessageID, tracker *ackTracker) { + a.mu.Lock() + defer a.mu.Unlock() + a.trackers[[2]int64{id.LedgerID(), id.EntryID()}] = tracker +} + +func (a *ackTrackers) remove(id MessageID) { + a.mu.Lock() + defer a.mu.Unlock() + delete(a.trackers, [2]int64{id.LedgerID(), id.EntryID()}) +} diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index 21280fb781..247f3c95eb 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -36,6 +36,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { options: &partitionConsumerOpts{}, metrics: newTestMetrics(), decryptor: crypto.NewNoopDecryptor(), + ackTrackers: newAckTrackers(), } pc.availablePermits = &availablePermits{pc: &pc} pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, @@ -75,6 +76,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { options: &partitionConsumerOpts{}, metrics: newTestMetrics(), decryptor: crypto.NewNoopDecryptor(), + ackTrackers: newAckTrackers(), } pc.availablePermits = &availablePermits{pc: &pc} pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, @@ -111,6 +113,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { options: &partitionConsumerOpts{}, metrics: newTestMetrics(), decryptor: crypto.NewNoopDecryptor(), + ackTrackers: newAckTrackers(), } pc.availablePermits = &availablePermits{pc: &pc} pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, @@ -150,6 +153,67 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { } } +func TestBatchMessageIDWithAckTrackers(t *testing.T) { + eventsCh := make(chan interface{}, 1) + pc := partitionConsumer{ + queueCh: make(chan []*message, 1), + eventsCh: eventsCh, + compressionProviders: sync.Map{}, + options: &partitionConsumerOpts{}, + metrics: newTestMetrics(), + decryptor: crypto.NewNoopDecryptor(), + ackTrackers: newAckTrackers(), + } + pc.availablePermits = &availablePermits{pc: &pc} + pc.ackGroupingTracker = newAckGroupingTracker(&AckGroupingOptions{MaxSize: 0}, + func(id MessageID) { pc.sendIndividualAck(id) }, nil, nil) + + headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10) + if err := pc.MessageReceived(nil, headersAndPayload); err != nil { + t.Fatal(err) + } + + // ensure the tracker was set on the message id + messages := <-pc.queueCh + for _, m := range messages { + assert.NotNil(t, m.ID().(*trackingMessageID).tracker) + } + + noAckTrackerMessages := make([]MessageID, 10) + for i, m := range messages { + tmp := m.ID().Serialize() + mid, err := DeserializeMessageID(tmp) + if err != nil { + t.Fatal(err) + } + noAckTrackerMessages[i] = mid + } + + // ack all message ids except the last one + for i := 0; i < 9; i++ { + _, ok := noAckTrackerMessages[i].(*trackingMessageID) + assert.False(t, ok) + err := pc.AckID(noAckTrackerMessages[i]) + assert.Nil(t, err) + } + + select { + case <-eventsCh: + t.Error("The message id should not be acked!") + default: + } + + // ack last message + err := pc.AckID(noAckTrackerMessages[9]) + assert.Nil(t, err) + + select { + case <-eventsCh: + default: + t.Error("Expected an ack request to be triggered!") + } +} + // Raw single message in old format // metadata properties: properties: // payload = "hello"