diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b9436fd54..f6523124c 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -848,21 +848,6 @@ type pendingItem struct { flushCallback func(err error) } -func (i *pendingItem) done(err error) { - if i.isDone { - return - } - i.isDone = true - buffersPool.Put(i.buffer) - if i.flushCallback != nil { - i.flushCallback(err) - } - - if i.cancel != nil { - i.cancel() - } -} - func (p *partitionProducer) internalFlushCurrentBatch() { if p.batchBuilder == nil { // batch is not enabled @@ -1752,6 +1737,21 @@ type flushRequest struct { err error } +func (i *pendingItem) done(err error) { + if i.isDone { + return + } + i.isDone = true + buffersPool.Put(i.buffer) + if i.flushCallback != nil { + i.flushCallback(err) + } + + if i.cancel != nil { + i.cancel() + } +} + // _setConn sets the internal connection field of this partition producer atomically. // Note: should only be called by this partition producer when a new connection is available. func (p *partitionProducer) _setConn(conn internal.Connection) {