Skip to content

Commit

Permalink
Fix zero queue consumer and table view reader
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Feb 26, 2025
1 parent 29cb448 commit effd9c7
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
1 change: 1 addition & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {

if options.EnableZeroQueueConsumer {
options.ReceiverQueueSize = 0
options.StartMessageIDInclusive = true
}

if options.Interceptors == nil {
Expand Down
10 changes: 6 additions & 4 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2305,12 +2305,14 @@ func convertToMessageID(id *pb.MessageIdData) *trackingMessageID {

msgID := &trackingMessageID{
messageID: &messageID{
ledgerID: int64(*id.LedgerId),
entryID: int64(*id.EntryId),
ledgerID: int64(id.GetLedgerId()),
entryID: int64(id.GetEntryId()),
batchIdx: -1,
batchSize: id.GetBatchSize(),
},
}
if id.BatchIndex != nil {
msgID.batchIdx = *id.BatchIndex
if id.GetBatchSize() > 1 {
msgID.batchIdx = id.GetBatchIndex()
}

return msgID
Expand Down
1 change: 0 additions & 1 deletion pulsar/consumer_zero_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,6 @@ func TestZeroQueueConsumer_Seek(t *testing.T) {
Topic: topicName,
EnableZeroQueueConsumer: true,
SubscriptionName: "sub-1",
StartMessageIDInclusive: true,
})
assert.Nil(t, err)
_, ok := consumer.(*zeroQueueConsumer)
Expand Down

0 comments on commit effd9c7

Please sign in to comment.