Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add new compactor based revision count #16427

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ var (
// revision 5000 when the current revision is 6000.
// This runs every 5-minute if enough of logs have proceeded.
CompactorModeRevision = v3compactor.ModeRevision

// CompactorModeRevisionThreshold is revision-change-based compaction
// mode for "Config.AutoCompactionMode" field.
// If "AutoCompactionMode" is CompactorModeRevisionThreshold and
// "AutoCompactionRetention" is "10000", it compacts log on revision
// 10000 when the current revision is 20000. This runs if there are
// more than 10000 revisions have occurred since previous compaction.
CompactorModeRevisionThreshold = v3compactor.ModeRevisionThreshold
)

func init() {
Expand Down Expand Up @@ -775,7 +783,7 @@ func (cfg *Config) Validate() error {
}

switch cfg.AutoCompactionMode {
case CompactorModeRevision, CompactorModePeriodic:
case CompactorModeRevision, CompactorModePeriodic, CompactorModeRevisionThreshold:
case "":
return errors.New("undefined auto-compaction-mode")
default:
Expand Down
2 changes: 2 additions & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,8 @@ func parseCompactionRetention(mode, retention string) (ret time.Duration, err er
ret = time.Duration(int64(h))
case CompactorModePeriodic:
ret = time.Duration(int64(h)) * time.Hour
case CompactorModeRevisionThreshold:
ret = time.Duration(int64(h))
case "":
return 0, errors.New("--auto-compaction-mode is undefined")
}
Expand Down
19 changes: 14 additions & 5 deletions server/etcdserver/api/v3compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
)

const (
ModePeriodic = "periodic"
ModeRevision = "revision"
ModePeriodic = "periodic"
ModeRevision = "revision"
ModeRevisionThreshold = "revision-threshold"
)

// Compactor purges old log from the storage periodically.
Expand All @@ -47,6 +48,11 @@ type Compactable interface {
Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
}

type KvGetter interface {
RevGetter
CompactNotify() chan struct{}
}

type RevGetter interface {
Rev() int64
}
Expand All @@ -56,17 +62,20 @@ func New(
lg *zap.Logger,
mode string,
retention time.Duration,
rg RevGetter,
kg KvGetter,
c Compactable,
) (Compactor, error) {
if lg == nil {
lg = zap.NewNop()
}

switch mode {
case ModePeriodic:
return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil
return newPeriodic(lg, clockwork.NewRealClock(), retention, kg, c), nil
case ModeRevision:
return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil
return newRevision(lg, clockwork.NewRealClock(), int64(retention), kg, c), nil
case ModeRevisionThreshold:
return newRevisionThreshold(lg, clockwork.NewRealClock(), kg, c), nil
default:
return nil, fmt.Errorf("unsupported compaction mode %s", mode)
}
Expand Down
112 changes: 112 additions & 0 deletions server/etcdserver/api/v3compactor/revision_threshold.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2023 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v3compactor

import (
"context"
"sync"
"time"

"github.com/jonboulle/clockwork"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.uber.org/zap"
)

type RevisionThreshold struct {
lg *zap.Logger

clock clockwork.Clock

kv KvGetter
c Compactable

ctx context.Context
cancel context.CancelFunc

mu sync.Mutex
paused bool
}

func newRevisionThreshold(lg *zap.Logger, clock clockwork.Clock, kv KvGetter, c Compactable) *RevisionThreshold {
rc := &RevisionThreshold{
lg: lg,
clock: clock,
kv: kv,
c: c,
}
rc.ctx, rc.cancel = context.WithCancel(context.Background())
return rc
}

func (rc *RevisionThreshold) Run() {
go func() {
for {
select {
case <-rc.ctx.Done():
return
case <-rc.kv.CompactNotify():
rc.mu.Lock()
p := rc.paused
rc.mu.Unlock()
if p {
continue
}
}

rev := rc.kv.Rev()

now := time.Now()
rc.lg.Info(
"starting auto revision compaction",
zap.Int64("revision", rev),
)

_, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
rc.lg.Info(
"completed auto revision compaction",
zap.Int64("revision", rev),
zap.Duration("took", time.Since(now)),
)
} else {
rc.lg.Warn(
"failed auto revision compaction",
zap.Int64("revision", rev),
zap.Error(err),
)
}
}
}()
}

// Stop stops revision-based compactor.
func (rc *RevisionThreshold) Stop() {
rc.cancel()
}

// Pause pauses revision-based compactor.
func (rc *RevisionThreshold) Pause() {
rc.mu.Lock()
rc.paused = true
rc.mu.Unlock()
}

// Resume resumes revision-based compactor.
func (rc *RevisionThreshold) Resume() {
rc.mu.Lock()
rc.paused = false
rc.mu.Unlock()
}
11 changes: 10 additions & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
if cfg.AutoCompactionMode == v3compactor.ModeRevisionThreshold {
mvccStoreConfig.CompactionNotifyThreshold = int64(cfg.AutoCompactionRetention)
}

srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())

Expand All @@ -387,7 +391,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
}()
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, any(srv.kv).(kvGetter), srv)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2479,3 +2483,8 @@ func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
func (s *EtcdServer) CorruptionChecker() CorruptionChecker {
return s.corruptionChecker
}

type kvGetter interface {
CompactNotify() chan struct{}
Rev() int64
}
34 changes: 32 additions & 2 deletions server/storage/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ var minimumBatchInterval = 10 * time.Millisecond
type StoreConfig struct {
CompactionBatchLimit int
CompactionSleepInterval time.Duration
// CompactionNotifyThreshold is used to guarantee that a notification
// is sent only after configured number of write transactions have
// occured since previsious compaction.
CompactionNotifyThreshold int64
}

type store struct {
Expand All @@ -77,6 +81,9 @@ type store struct {
currentRev int64
// compactMainRev is the main revision of the last compaction.
compactMainRev int64
// compactNotifyCh is used to notify the compactor that it's time to
// compact.
compactNotifyCh chan struct{}

fifoSched schedule.Scheduler

Expand Down Expand Up @@ -105,8 +112,9 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi

le: le,

currentRev: 1,
compactMainRev: -1,
currentRev: 1,
compactMainRev: -1,
compactNotifyCh: make(chan struct{}, 1),

fifoSched: schedule.NewFIFOScheduler(lg),

Expand Down Expand Up @@ -488,6 +496,9 @@ func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, k

func (s *store) Close() error {
close(s.stopc)
if s.compactNotifyCh != nil {
close(s.compactNotifyCh)
}
s.fifoSched.Stop()
return nil
}
Expand Down Expand Up @@ -539,3 +550,22 @@ func isTombstone(b []byte) bool {
func (s *store) HashStorage() HashStorage {
return s.hashes
}

func (s *store) CompactNotify() chan struct{} {
return s.compactNotifyCh
}

func (s *store) doCompactNotify() {
threshold := s.cfg.CompactionNotifyThreshold

if threshold <= 0 {
return
}

if s.currentRev-s.compactMainRev > threshold {
select {
case s.compactNotifyCh <- struct{}{}:
default:
}
}
}
2 changes: 2 additions & 0 deletions server/storage/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func (tw *storeTxnWrite) End() {
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++

tw.s.doCompactNotify()
}
tw.tx.Unlock()
if len(tw.changes) != 0 {
Expand Down
24 changes: 23 additions & 1 deletion server/storage/quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ const (
// MaxQuotaBytes is the maximum number of bytes suggested for a backend
// quota. A larger quota may lead to degraded performance.
MaxQuotaBytes = int64(8 * 1024 * 1024 * 1024) // 8GB

// MaxAllowedOverflowQuotaBytes is the number of bytes the backend size
// can be overflow after exceeding the space quota.
MaxAllowedOverflowQuotaBytes = int64(1024 * 1024 * 1024) // 1GB
)

// Quota represents an arbitrary quota against arbitrary requests. Each request
Expand Down Expand Up @@ -130,7 +134,17 @@ func (b *BackendQuota) Available(v interface{}) bool {
return true
}
// TODO: maybe optimize Backend.Size()
return b.be.Size()+int64(cost) < b.maxBackendBytes

// Since the compact comes with allocatable pages, we should check the
// SizeInUse first. If there is no continuous pages for key/value and
// the boltdb continues to resize, it should not increase more than 1
// GiB. It's hard limitation.
//
// TODO: It should be enabled by flag.
if b.be.Size()+int64(cost)-b.maxBackendBytes >= maxAllowedOverflowBytes(b.maxBackendBytes) {
return false
}
return b.be.SizeInUse()+int64(cost) < b.maxBackendBytes
}

func (b *BackendQuota) Cost(v interface{}) int {
Expand Down Expand Up @@ -174,3 +188,11 @@ func costTxn(r *pb.TxnRequest) int {
func (b *BackendQuota) Remaining() int64 {
return b.maxBackendBytes - b.be.Size()
}

func maxAllowedOverflowBytes(maxBackendBytes int64) int64 {
allow := maxBackendBytes * 10 / 100
if allow > MaxAllowedOverflowQuotaBytes {
allow = MaxAllowedOverflowQuotaBytes
}
return allow
}
15 changes: 11 additions & 4 deletions tools/benchmark/cmd/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ var putCmd = &cobra.Command{
}

var (
keySize int
valSize int
keySize int
valSize int
deltaValSize int

putTotal int
putRate int
Expand All @@ -61,6 +62,7 @@ func init() {
RootCmd.AddCommand(putCmd)
putCmd.Flags().IntVar(&keySize, "key-size", 8, "Key size of put request")
putCmd.Flags().IntVar(&valSize, "val-size", 8, "Value size of put request")
putCmd.Flags().IntVar(&deltaValSize, "delta-val-size", 0, "Delta of value size of put request")
putCmd.Flags().IntVar(&putRate, "rate", 0, "Maximum puts per second (0 is no limit)")

putCmd.Flags().IntVar(&putTotal, "total", 10000, "Total number of put requests")
Expand All @@ -83,7 +85,7 @@ func putFunc(cmd *cobra.Command, _ []string) {
}
limit := rate.NewLimiter(rate.Limit(putRate), 1)
clients := mustCreateClients(totalClients, totalConns)
k, v := make([]byte, keySize), string(mustRandBytes(valSize))
k, v := make([]byte, keySize), string(mustRandBytes(valSize+deltaValSize))

bar = pb.New(putTotal)
bar.Start()
Expand All @@ -104,14 +106,19 @@ func putFunc(cmd *cobra.Command, _ []string) {
}(clients[i])
}

baseValSize := valSize - deltaValSize
go func() {
for i := 0; i < putTotal; i++ {
if seqKeys {
binary.PutVarint(k, int64(i%keySpaceSize))
} else {
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
}
requests <- v3.OpPut(string(k), v)
deltaV := v
if deltaValSize > 0 {
deltaV = v[:baseValSize+rand.Intn(2*deltaValSize)]
}
requests <- v3.OpPut(string(k), deltaV)
}
close(requests)
}()
Expand Down
Loading