Skip to content

Commit da4b35d

Browse files
authored
flushing objects in chunks (#22319)
- Process appendable objects in controlled chunks to maintain stable memory usage. - A chunk means 400 appendable data objects Approved by: @XuPeng-SH
1 parent 7741fe5 commit da4b35d

File tree

1 file changed

+57
-19
lines changed

1 file changed

+57
-19
lines changed

pkg/vm/engine/tae/db/checkpoint/flusher.go

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -540,30 +540,68 @@ func (flusher *flushImpl) fireFlushTabletail(
540540
return nil
541541
}
542542

543-
// freeze all append
544-
scopes := make([]common.ID, 0, len(metas))
545-
for _, meta := range metas {
546-
if !meta.GetObjectData().PrepareCompact() {
547-
logutil.Info("[FlushTabletail] data prepareCompact false", zap.String("table", tableDesc), zap.String("obj", meta.ID().String()))
548-
return moerr.GetOkExpectedEOB()
549-
}
550-
scopes = append(scopes, *meta.AsCommonID())
543+
if len(metas) == 0 && len(tombstoneMetas) > 0 {
544+
metas = append(metas, nil) // make it a non-empty chunk
551545
}
552-
for _, meta := range tombstoneMetas {
553-
if !meta.GetObjectData().PrepareCompact() {
554-
logutil.Info("[FlushTabletail] tomb prepareCompact false", zap.String("table", tableDesc), zap.String("obj", meta.ID().String()))
555-
return moerr.GetOkExpectedEOB()
546+
547+
metaChunks := slices.Chunk(metas, 400)
548+
firstChunk := true
549+
550+
nextChunk:
551+
for chunk := range metaChunks {
552+
if len(chunk) == 1 && chunk[0] == nil {
553+
chunk = chunk[:0] // remove the placeholder
554+
}
555+
scopes := make([]common.ID, 0, len(chunk))
556+
for _, meta := range chunk {
557+
if !meta.GetObjectData().PrepareCompact() {
558+
logutil.Info("[FlushTabletail] data prepareCompact false",
559+
zap.String("table", tableDesc),
560+
zap.String("obj", meta.ID().String()),
561+
)
562+
break nextChunk
563+
}
564+
scopes = append(scopes, *meta.AsCommonID())
565+
}
566+
if firstChunk {
567+
for _, meta := range tombstoneMetas {
568+
if !meta.GetObjectData().PrepareCompact() {
569+
logutil.Info("[FlushTabletail] tomb prepareCompact false",
570+
zap.String("table", tableDesc),
571+
zap.String("obj", meta.ID().String()),
572+
)
573+
// As we treat tombstone as a monolithic chunk,
574+
// skip this fire and wait for the next run if freeze fails.
575+
return moerr.GetOkExpectedEOB()
576+
}
577+
scopes = append(scopes, *meta.AsCommonID())
578+
}
579+
}
580+
var factory tasks.TxnTaskFactory
581+
if firstChunk {
582+
factory = jobs.FlushTableTailTaskFactory(chunk, tombstoneMetas, flusher.rt)
583+
firstChunk = false
584+
} else {
585+
logutil.Info("[FlushTabletail] fire chunk",
586+
zap.String("table", tableDesc),
587+
zap.Int("chunk-size", len(chunk)),
588+
)
589+
factory = jobs.FlushTableTailTaskFactory(chunk, nil, flusher.rt)
556590
}
557-
scopes = append(scopes, *meta.AsCommonID())
558-
}
559591

560-
factory := jobs.FlushTableTailTaskFactory(metas, tombstoneMetas, flusher.rt)
561-
if _, err := flusher.rt.Scheduler.ScheduleMultiScopedTxnTask(nil, tasks.FlushTableTailTask, scopes, factory); err != nil {
562-
if err != tasks.ErrScheduleScopeConflict {
563-
logutil.Error("[FlushTabletail] Sched Failure", zap.String("table", tableDesc), zap.Error(err))
592+
_, err := flusher.rt.Scheduler.ScheduleMultiScopedTxnTask(
593+
nil, tasks.FlushTableTailTask, scopes, factory)
594+
if err != nil {
595+
if err != tasks.ErrScheduleScopeConflict {
596+
logutil.Error("[FlushTabletail] Sched Failure",
597+
zap.String("table", tableDesc),
598+
zap.Error(err),
599+
)
600+
}
601+
return moerr.GetOkExpectedEOB()
564602
}
565-
return moerr.GetOkExpectedEOB()
566603
}
604+
567605
return nil
568606
}
569607

0 commit comments

Comments
 (0)