diff --git a/exporter/exporterhelper/internal/queuebatch/default_batcher.go b/exporter/exporterhelper/internal/queuebatch/default_batcher.go index 061e7570d64..af61e0b8ad6 100644 --- a/exporter/exporterhelper/internal/queuebatch/default_batcher.go +++ b/exporter/exporterhelper/internal/queuebatch/default_batcher.go @@ -16,9 +16,10 @@ import ( ) type batch struct { - ctx context.Context - req request.Request - done multiDone + ctx context.Context + req request.Request + done multiDone + created time.Time } type batcherSettings[T any] struct { @@ -38,7 +39,7 @@ type defaultBatcher struct { stopWG sync.WaitGroup currentBatchMu sync.Mutex currentBatch *batch - timer *time.Timer + ticker *time.Ticker shutdownCh chan struct{} } @@ -62,12 +63,6 @@ func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) } } -func (qb *defaultBatcher) resetTimer() { - if qb.cfg.FlushTimeout > 0 { - qb.timer.Reset(qb.cfg.FlushTimeout) - } -} - func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done Done) { qb.currentBatchMu.Lock() @@ -91,11 +86,11 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done // Do not flush the last item and add it to the current batch. reqList = reqList[:len(reqList)-1] qb.currentBatch = &batch{ - ctx: ctx, - req: lastReq, - done: multiDone{done}, + ctx: ctx, + req: lastReq, + done: multiDone{done}, + created: time.Now(), } - qb.resetTimer() } qb.currentBatchMu.Unlock() @@ -146,11 +141,11 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done // Do not flush the last item and add it to the current batch. reqList = reqList[:len(reqList)-1] qb.currentBatch = &batch{ - ctx: ctx, - req: lastReq, - done: multiDone{done}, + ctx: ctx, + req: lastReq, + done: multiDone{done}, + created: time.Now(), } - qb.resetTimer() } } @@ -172,8 +167,8 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() { select { case <-qb.shutdownCh: return - case <-qb.timer.C: - qb.flushCurrentBatchIfNecessary() + case <-qb.ticker.C: + qb.flushCurrentBatchIfNecessary(false) } } }() @@ -182,7 +177,7 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() { // Start starts the goroutine that reads from the queue and flushes asynchronously. func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error { if qb.cfg.FlushTimeout > 0 { - qb.timer = time.NewTimer(qb.cfg.FlushTimeout) + qb.ticker = time.NewTicker(qb.cfg.FlushTimeout) qb.startTimeBasedFlushingGoroutine() } @@ -190,19 +185,22 @@ func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error { } // flushCurrentBatchIfNecessary sends out the current request batch if it is not nil -func (qb *defaultBatcher) flushCurrentBatchIfNecessary() { +func (qb *defaultBatcher) flushCurrentBatchIfNecessary(forceFlush bool) { qb.currentBatchMu.Lock() if qb.currentBatch == nil { qb.currentBatchMu.Unlock() return } + if !forceFlush && time.Since(qb.currentBatch.created) < qb.cfg.FlushTimeout { + qb.currentBatchMu.Unlock() + return + } batchToFlush := qb.currentBatch qb.currentBatch = nil qb.currentBatchMu.Unlock() // flush() blocks until successfully started a goroutine for flushing. qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done) - qb.resetTimer() } // flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary. @@ -224,7 +222,7 @@ func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done D func (qb *defaultBatcher) Shutdown(_ context.Context) error { close(qb.shutdownCh) // Make sure execute one last flush if necessary. - qb.flushCurrentBatchIfNecessary() + qb.flushCurrentBatchIfNecessary(true) qb.stopWG.Wait() return nil } diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go index 13b7ea4801b..1a2a21dcd23 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go @@ -590,8 +590,8 @@ func TestQueueBatchTimerFlush(t *testing.T) { assert.LessOrEqual(t, 1, sink.RequestsCount()) assert.Equal(t, 8, sink.ItemsCount()) - // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) - time.Sleep(50 * time.Millisecond) + // Confirm that it is flushed after 100ms (using 60+100=160 here to be safe) + time.Sleep(100 * time.Millisecond) assert.LessOrEqual(t, 2, sink.RequestsCount()) assert.Equal(t, 12, sink.ItemsCount()) require.NoError(t, qb.Shutdown(context.Background()))