From ff6ddd6d07ffc07274313262a4cb223c6505d234 Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 27 Feb 2025 16:50:13 +0800 Subject: [PATCH] revert pendingItem.done() to its old position --- pulsar/producer_partition.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b9436fd54..f6523124c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -848,21 +848,6 @@ type pendingItem struct { flushCallback func(err error) } -func (i *pendingItem) done(err error) { - if i.isDone { - return - } - i.isDone = true - buffersPool.Put(i.buffer) - if i.flushCallback != nil { - i.flushCallback(err) - } - - if i.cancel != nil { - i.cancel() - } -} - func (p *partitionProducer) internalFlushCurrentBatch() { if p.batchBuilder == nil { // batch is not enabled @@ -1752,6 +1737,21 @@ type flushRequest struct { err error } +func (i *pendingItem) done(err error) { + if i.isDone { + return + } + i.isDone = true + buffersPool.Put(i.buffer) + if i.flushCallback != nil { + i.flushCallback(err) + } + + if i.cancel != nil { + i.cancel() + } +} + // _setConn sets the internal connection field of this partition producer atomically. // Note: should only be called by this partition producer when a new connection is available. func (p *partitionProducer) _setConn(conn internal.Connection) {