Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions exporter/exporterhelper/internal/queuebatch/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

type batch struct {
ctx context.Context
req request.Request
done multiDone
ctx context.Context
req request.Request
done multiDone
created time.Time
}

type batcherSettings[T any] struct {
Expand All @@ -38,7 +39,7 @@ type defaultBatcher struct {
stopWG sync.WaitGroup
currentBatchMu sync.Mutex
currentBatch *batch
timer *time.Timer
ticker *time.Ticker
shutdownCh chan struct{}
}

Expand All @@ -62,12 +63,6 @@ func newDefaultBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request])
}
}

func (qb *defaultBatcher) resetTimer() {
if qb.cfg.FlushTimeout > 0 {
qb.timer.Reset(qb.cfg.FlushTimeout)
}
}

func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done Done) {
qb.currentBatchMu.Lock()

Expand All @@ -91,11 +86,11 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
req: lastReq,
done: multiDone{done},
ctx: ctx,
req: lastReq,
done: multiDone{done},
created: time.Now(),
}
qb.resetTimer()
}

qb.currentBatchMu.Unlock()
Expand Down Expand Up @@ -146,11 +141,11 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
// Do not flush the last item and add it to the current batch.
reqList = reqList[:len(reqList)-1]
qb.currentBatch = &batch{
ctx: ctx,
req: lastReq,
done: multiDone{done},
ctx: ctx,
req: lastReq,
done: multiDone{done},
created: time.Now(),
}
qb.resetTimer()
}
}

Expand All @@ -172,8 +167,8 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
select {
case <-qb.shutdownCh:
return
case <-qb.timer.C:
qb.flushCurrentBatchIfNecessary()
case <-qb.ticker.C:
qb.flushCurrentBatchIfNecessary(false)
}
}
}()
Expand All @@ -182,27 +177,30 @@ func (qb *defaultBatcher) startTimeBasedFlushingGoroutine() {
// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *defaultBatcher) Start(_ context.Context, _ component.Host) error {
if qb.cfg.FlushTimeout > 0 {
qb.timer = time.NewTimer(qb.cfg.FlushTimeout)
qb.ticker = time.NewTicker(qb.cfg.FlushTimeout)
qb.startTimeBasedFlushingGoroutine()
}

return nil
}

// flushCurrentBatchIfNecessary sends out the current request batch if it is not nil
func (qb *defaultBatcher) flushCurrentBatchIfNecessary() {
func (qb *defaultBatcher) flushCurrentBatchIfNecessary(forceFlush bool) {
qb.currentBatchMu.Lock()
if qb.currentBatch == nil {
qb.currentBatchMu.Unlock()
return
}
if !forceFlush && time.Since(qb.currentBatch.created) < qb.cfg.FlushTimeout {
qb.currentBatchMu.Unlock()
return
}
Comment on lines +194 to +197
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mean that we no longer do flush every FlushTimeout, but somewhere between [FlushTimeout, 2*FlushTimeout]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Is changing to a Ticker needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean.
I was trying to use a single time keeper for all batches, but if we want to control the flush timeout more precisely, we could keep a separate timer for each batch instead

batchToFlush := qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flush() blocks until successfully started a goroutine for flushing.
qb.flush(batchToFlush.ctx, batchToFlush.req, batchToFlush.done)
qb.resetTimer()
}

// flush starts a goroutine that calls consumeFunc. It blocks until a worker is available if necessary.
Expand All @@ -224,7 +222,7 @@ func (qb *defaultBatcher) flush(ctx context.Context, req request.Request, done D
func (qb *defaultBatcher) Shutdown(_ context.Context) error {
close(qb.shutdownCh)
// Make sure execute one last flush if necessary.
qb.flushCurrentBatchIfNecessary()
qb.flushCurrentBatchIfNecessary(true)
qb.stopWG.Wait()
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,8 @@ func TestQueueBatchTimerFlush(t *testing.T) {
assert.LessOrEqual(t, 1, sink.RequestsCount())
assert.Equal(t, 8, sink.ItemsCount())

// Confirm that it is flushed after 100ms (using 60+50=110 here to be safe)
time.Sleep(50 * time.Millisecond)
// Confirm that it is flushed after 100ms (using 60+100=160 here to be safe)
time.Sleep(100 * time.Millisecond)
assert.LessOrEqual(t, 2, sink.RequestsCount())
assert.Equal(t, 12, sink.ItemsCount())
require.NoError(t, qb.Shutdown(context.Background()))
Expand Down
Loading