Skip to content

Commit

Permalink
Merge pull request #64 from naveen246/refactor-compaction
Browse files Browse the repository at this point in the history
Refactor compaction
  • Loading branch information
thrawn01 authored Feb 5, 2025
2 parents f333b92 + f49f936 commit caab2d1
Show file tree
Hide file tree
Showing 15 changed files with 1,023 additions and 1,018 deletions.
6 changes: 0 additions & 6 deletions .golangci.yml

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package compaction
package compacted

import (
"bytes"
Expand Down
139 changes: 139 additions & 0 deletions slatedb/compaction/compactor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package compaction

import (
"strconv"

"github.com/oklog/ulid/v2"
"github.com/samber/mo"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/slatedb/compacted"
"github.com/slatedb/slatedb-go/slatedb/config"
"github.com/slatedb/slatedb-go/slatedb/store"
)

type Scheduler interface {
maybeScheduleCompaction(state *CompactorState) []Compaction
}

type CompactorMainMsg int

const (
CompactorShutdown CompactorMainMsg = iota + 1
)

type Result struct {
SortedRun *compacted.SortedRun
Error error
}

// ------------------------------------------------
// SourceID
// ------------------------------------------------

type SourceIDType int

const (
SortedRunID SourceIDType = iota + 1
SSTID
)

type SourceID struct {
typ SourceIDType
value string
}

func NewSourceIDSST(id ulid.ULID) SourceID {
return SourceID{
typ: SSTID,
value: id.String(),
}
}

func (s SourceID) SortedRunID() mo.Option[uint32] {
if s.typ != SortedRunID {
return mo.None[uint32]()
}
val, err := strconv.Atoi(s.value)
if err != nil {
return mo.None[uint32]()
}
return mo.Some(uint32(val))
}

func (s SourceID) SstID() mo.Option[ulid.ULID] {
if s.typ != SSTID {
return mo.None[ulid.ULID]()
}
val, err := ulid.Parse(s.value)
if err != nil {
return mo.None[ulid.ULID]()
}
return mo.Some(val)
}

// ------------------------------------------------
// Compaction
// ------------------------------------------------

type Status int

const (
Submitted Status = iota + 1
InProgress
)

type Compaction struct {
Status Status
sources []SourceID
destination uint32
}

func NewCompaction(sources []SourceID, destination uint32) Compaction {
return Compaction{
Status: Submitted,
sources: sources,
destination: destination,
}
}

// Compactor - The Orchestrator checks with the Scheduler if Level0 needs to be compacted.
// If compaction is needed, the Orchestrator gives Jobs to the Executor.
// The Executor creates new goroutine for each Job and the results are written to a channel.
type Compactor struct {
orchestrator *Orchestrator
}

func NewCompactor(manifestStore *store.ManifestStore, tableStore *store.TableStore, opts config.DBOptions) (*Compactor, error) {
orchestrator, err := spawnAndRunCompactionOrchestrator(manifestStore, tableStore, opts)
if err != nil {
return nil, err
}

return &Compactor{
orchestrator: orchestrator,
}, nil
}

func (c *Compactor) Close() {
c.orchestrator.shutdown()
}

func spawnAndRunCompactionOrchestrator(
manifestStore *store.ManifestStore,
tableStore *store.TableStore,
opts config.DBOptions,
) (*Orchestrator, error) {
orchestrator, err := NewOrchestrator(opts, manifestStore, tableStore)
if err != nil {
return nil, err
}

orchestrator.spawnLoop(opts)
return orchestrator, nil
}

type Job struct {
destination uint32
sstList []sstable.Handle
sortedRuns []compacted.SortedRun
}
178 changes: 178 additions & 0 deletions slatedb/compaction/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package compaction

import (
"context"
"sync"
"sync/atomic"

"github.com/oklog/ulid/v2"
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/iter"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/compacted"
"github.com/slatedb/slatedb-go/slatedb/config"
"github.com/slatedb/slatedb-go/slatedb/store"
)

type Executor struct {
options *config.CompactorOptions
tableStore *store.TableStore

resultCh chan Result
tasksWG sync.WaitGroup
stopped atomic.Bool
}

func newExecutor(
options *config.CompactorOptions,
tableStore *store.TableStore,
) *Executor {
return &Executor{
options: options,
tableStore: tableStore,
resultCh: make(chan Result, 1),
}
}

func (e *Executor) nextCompactionResult() (Result, bool) {
select {
case result := <-e.resultCh:
return result, true
default:
return Result{}, false
}
}

// create an iterator for CompactionJob.sstList and another iterator for CompactionJob.sortedRuns
// Return the merged iterator for the above 2 iterators
func (e *Executor) loadIterators(compaction Job) (iter.KVIterator, error) {
assert.True(
!(len(compaction.sstList) == 0 && len(compaction.sortedRuns) == 0),
"Compaction sources cannot be empty",
)

l0Iters := make([]iter.KVIterator, 0)
for _, sst := range compaction.sstList {
sstIter, err := sstable.NewIterator(&sst, e.tableStore.Clone())
if err != nil {
return nil, err
}
l0Iters = append(l0Iters, sstIter)
}

srIters := make([]iter.KVIterator, 0)
for _, sr := range compaction.sortedRuns {
srIter, err := compacted.NewSortedRunIterator(sr, e.tableStore.Clone())
if err != nil {
return nil, err
}
srIters = append(srIters, srIter)
}

ctx := context.TODO()
var l0MergeIter, srMergeIter iter.KVIterator
if len(compaction.sortedRuns) == 0 {
l0MergeIter = iter.NewMergeSort(ctx, l0Iters...)
return l0MergeIter, nil
} else if len(compaction.sstList) == 0 {
srMergeIter = iter.NewMergeSort(ctx, srIters...)
return srMergeIter, nil
}

it := iter.NewMergeSort(ctx, l0MergeIter, srMergeIter)
return it, nil
}

func (e *Executor) executeCompaction(compaction Job) (*compacted.SortedRun, error) {
allIter, err := e.loadIterators(compaction)
if err != nil {
return nil, err
}
var warn types.ErrWarn

outputSSTs := make([]sstable.Handle, 0)
currentWriter := e.tableStore.TableWriter(sstable.NewIDCompacted(ulid.Make()))
currentSize := 0
for {
kv, ok := allIter.NextEntry(context.TODO())
if !ok {
if w := allIter.Warnings(); w != nil {
warn.Merge(w)
}
break
}

value := kv.Value.GetValue()
err = currentWriter.Add(kv.Key, value)
if err != nil {
return nil, err
}

currentSize += len(kv.Key)
if value.IsPresent() {
val, _ := value.Get()
currentSize += len(val)
}

if uint64(currentSize) > e.options.MaxSSTSize {
currentSize = 0
finishedWriter := currentWriter
currentWriter = e.tableStore.TableWriter(sstable.NewIDCompacted(ulid.Make()))
sst, err := finishedWriter.Close()
if err != nil {
return nil, err
}
outputSSTs = append(outputSSTs, *sst)
}
}
if currentSize > 0 {
sst, err := currentWriter.Close()
if err != nil {
return nil, err
}
outputSSTs = append(outputSSTs, *sst)
}
return &compacted.SortedRun{
ID: compaction.destination,
SSTList: outputSSTs,
}, warn.If()
}

func (e *Executor) startCompaction(compaction Job) {
if e.isStopped() {
return
}

e.tasksWG.Add(1)
go func() {
defer e.tasksWG.Done()

if e.isStopped() {
return
}

var result Result
sortedRun, err := e.executeCompaction(compaction)
if err != nil {
// TODO(thrawn01): log the error somewhere.
result = Result{Error: err}
} else if sortedRun != nil {
result = Result{SortedRun: sortedRun}
}
e.resultCh <- result
}()
}

func (e *Executor) stop() {
e.stopped.Store(true)
e.waitForTasksCompletion()
}

func (e *Executor) waitForTasksCompletion() {
e.tasksWG.Wait()
}

func (e *Executor) isStopped() bool {
return e.stopped.Load()
}
Loading

0 comments on commit caab2d1

Please sign in to comment.