diff --git a/server/embed/config.go b/server/embed/config.go index 0b79ce4bc25..5f1e6ca7e5d 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -139,6 +139,14 @@ var ( // revision 5000 when the current revision is 6000. // This runs every 5-minute if enough of logs have proceeded. CompactorModeRevision = v3compactor.ModeRevision + + // CompactorModeRevisionThreshold is revision-change-based compaction + // mode for "Config.AutoCompactionMode" field. + // If "AutoCompactionMode" is CompactorModeRevisionThreshold and + // "AutoCompactionRetention" is "10000", it compacts log on revision + // 10000 when the current revision is 20000. This runs if there are + // more than 10000 revisions have occurred since previous compaction. + CompactorModeRevisionThreshold = v3compactor.ModeRevisionThreshold ) func init() { @@ -775,7 +783,7 @@ func (cfg *Config) Validate() error { } switch cfg.AutoCompactionMode { - case CompactorModeRevision, CompactorModePeriodic: + case CompactorModeRevision, CompactorModePeriodic, CompactorModeRevisionThreshold: case "": return errors.New("undefined auto-compaction-mode") default: diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 594e11ec385..bf438ee78d2 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -888,6 +888,8 @@ func parseCompactionRetention(mode, retention string) (ret time.Duration, err er ret = time.Duration(int64(h)) case CompactorModePeriodic: ret = time.Duration(int64(h)) * time.Hour + case CompactorModeRevisionThreshold: + ret = time.Duration(int64(h)) case "": return 0, errors.New("--auto-compaction-mode is undefined") } diff --git a/server/etcdserver/api/v3compactor/compactor.go b/server/etcdserver/api/v3compactor/compactor.go index e352670c12b..f4ade944619 100644 --- a/server/etcdserver/api/v3compactor/compactor.go +++ b/server/etcdserver/api/v3compactor/compactor.go @@ -26,8 +26,9 @@ import ( ) const ( - ModePeriodic = "periodic" - ModeRevision = "revision" + ModePeriodic = "periodic" + ModeRevision = "revision" + ModeRevisionThreshold = "revision-threshold" ) // Compactor purges old log from the storage periodically. @@ -47,6 +48,11 @@ type Compactable interface { Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) } +type KvGetter interface { + RevGetter + CompactNotify() chan struct{} +} + type RevGetter interface { Rev() int64 } @@ -56,17 +62,20 @@ func New( lg *zap.Logger, mode string, retention time.Duration, - rg RevGetter, + kg KvGetter, c Compactable, ) (Compactor, error) { if lg == nil { lg = zap.NewNop() } + switch mode { case ModePeriodic: - return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil + return newPeriodic(lg, clockwork.NewRealClock(), retention, kg, c), nil case ModeRevision: - return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil + return newRevision(lg, clockwork.NewRealClock(), int64(retention), kg, c), nil + case ModeRevisionThreshold: + return newRevisionThreshold(lg, clockwork.NewRealClock(), kg, c), nil default: return nil, fmt.Errorf("unsupported compaction mode %s", mode) } diff --git a/server/etcdserver/api/v3compactor/revision_threshold.go b/server/etcdserver/api/v3compactor/revision_threshold.go new file mode 100644 index 00000000000..356577ede00 --- /dev/null +++ b/server/etcdserver/api/v3compactor/revision_threshold.go @@ -0,0 +1,112 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3compactor + +import ( + "context" + "sync" + "time" + + "github.com/jonboulle/clockwork" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/server/v3/storage/mvcc" + "go.uber.org/zap" +) + +type RevisionThreshold struct { + lg *zap.Logger + + clock clockwork.Clock + + kv KvGetter + c Compactable + + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + paused bool +} + +func newRevisionThreshold(lg *zap.Logger, clock clockwork.Clock, kv KvGetter, c Compactable) *RevisionThreshold { + rc := &RevisionThreshold{ + lg: lg, + clock: clock, + kv: kv, + c: c, + } + rc.ctx, rc.cancel = context.WithCancel(context.Background()) + return rc +} + +func (rc *RevisionThreshold) Run() { + go func() { + for { + select { + case <-rc.ctx.Done(): + return + case <-rc.kv.CompactNotify(): + rc.mu.Lock() + p := rc.paused + rc.mu.Unlock() + if p { + continue + } + } + + rev := rc.kv.Rev() + + now := time.Now() + rc.lg.Info( + "starting auto revision compaction", + zap.Int64("revision", rev), + ) + + _, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev}) + if err == nil || err == mvcc.ErrCompacted { + rc.lg.Info( + "completed auto revision compaction", + zap.Int64("revision", rev), + zap.Duration("took", time.Since(now)), + ) + } else { + rc.lg.Warn( + "failed auto revision compaction", + zap.Int64("revision", rev), + zap.Error(err), + ) + } + } + }() +} + +// Stop stops revision-based compactor. +func (rc *RevisionThreshold) Stop() { + rc.cancel() +} + +// Pause pauses revision-based compactor. +func (rc *RevisionThreshold) Pause() { + rc.mu.Lock() + rc.paused = true + rc.mu.Unlock() +} + +// Resume resumes revision-based compactor. +func (rc *RevisionThreshold) Resume() { + rc.mu.Lock() + rc.paused = false + rc.mu.Unlock() +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4a2128104c9..b43db86cf5c 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -373,6 +373,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { CompactionBatchLimit: cfg.CompactionBatchLimit, CompactionSleepInterval: cfg.CompactionSleepInterval, } + if cfg.AutoCompactionMode == v3compactor.ModeRevisionThreshold { + mvccStoreConfig.CompactionNotifyThreshold = int64(cfg.AutoCompactionRetention) + } + srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage()) @@ -387,7 +391,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() if num := cfg.AutoCompactionRetention; num != 0 { - srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv) + srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, any(srv.kv).(kvGetter), srv) if err != nil { return nil, err } @@ -2479,3 +2483,8 @@ func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { func (s *EtcdServer) CorruptionChecker() CorruptionChecker { return s.corruptionChecker } + +type kvGetter interface { + CompactNotify() chan struct{} + Rev() int64 +} diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index eeb82c68bc9..1e7df6bb664 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -53,6 +53,10 @@ var minimumBatchInterval = 10 * time.Millisecond type StoreConfig struct { CompactionBatchLimit int CompactionSleepInterval time.Duration + // CompactionNotifyThreshold is used to guarantee that a notification + // is sent only after configured number of write transactions have + // occured since previsious compaction. + CompactionNotifyThreshold int64 } type store struct { @@ -77,6 +81,9 @@ type store struct { currentRev int64 // compactMainRev is the main revision of the last compaction. compactMainRev int64 + // compactNotifyCh is used to notify the compactor that it's time to + // compact. + compactNotifyCh chan struct{} fifoSched schedule.Scheduler @@ -105,8 +112,9 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi le: le, - currentRev: 1, - compactMainRev: -1, + currentRev: 1, + compactMainRev: -1, + compactNotifyCh: make(chan struct{}, 1), fifoSched: schedule.NewFIFOScheduler(lg), @@ -488,6 +496,9 @@ func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, k func (s *store) Close() error { close(s.stopc) + if s.compactNotifyCh != nil { + close(s.compactNotifyCh) + } s.fifoSched.Stop() return nil } @@ -539,3 +550,22 @@ func isTombstone(b []byte) bool { func (s *store) HashStorage() HashStorage { return s.hashes } + +func (s *store) CompactNotify() chan struct{} { + return s.compactNotifyCh +} + +func (s *store) doCompactNotify() { + threshold := s.cfg.CompactionNotifyThreshold + + if threshold <= 0 { + return + } + + if s.currentRev-s.compactMainRev > threshold { + select { + case s.compactNotifyCh <- struct{}{}: + default: + } + } +} diff --git a/server/storage/mvcc/kvstore_txn.go b/server/storage/mvcc/kvstore_txn.go index b93fcbe64da..ec5565e28aa 100644 --- a/server/storage/mvcc/kvstore_txn.go +++ b/server/storage/mvcc/kvstore_txn.go @@ -180,6 +180,8 @@ func (tw *storeTxnWrite) End() { // hold revMu lock to prevent new read txns from opening until writeback. tw.s.revMu.Lock() tw.s.currentRev++ + + tw.s.doCompactNotify() } tw.tx.Unlock() if len(tw.changes) != 0 { diff --git a/server/storage/quota.go b/server/storage/quota.go index f24ca987cb4..a2007069db9 100644 --- a/server/storage/quota.go +++ b/server/storage/quota.go @@ -31,6 +31,10 @@ const ( // MaxQuotaBytes is the maximum number of bytes suggested for a backend // quota. A larger quota may lead to degraded performance. MaxQuotaBytes = int64(8 * 1024 * 1024 * 1024) // 8GB + + // MaxAllowedOverflowQuotaBytes is the number of bytes the backend size + // can be overflow after exceeding the space quota. + MaxAllowedOverflowQuotaBytes = int64(1024 * 1024 * 1024) // 1GB ) // Quota represents an arbitrary quota against arbitrary requests. Each request @@ -130,7 +134,17 @@ func (b *BackendQuota) Available(v interface{}) bool { return true } // TODO: maybe optimize Backend.Size() - return b.be.Size()+int64(cost) < b.maxBackendBytes + + // Since the compact comes with allocatable pages, we should check the + // SizeInUse first. If there is no continuous pages for key/value and + // the boltdb continues to resize, it should not increase more than 1 + // GiB. It's hard limitation. + // + // TODO: It should be enabled by flag. + if b.be.Size()+int64(cost)-b.maxBackendBytes >= maxAllowedOverflowBytes(b.maxBackendBytes) { + return false + } + return b.be.SizeInUse()+int64(cost) < b.maxBackendBytes } func (b *BackendQuota) Cost(v interface{}) int { @@ -174,3 +188,11 @@ func costTxn(r *pb.TxnRequest) int { func (b *BackendQuota) Remaining() int64 { return b.maxBackendBytes - b.be.Size() } + +func maxAllowedOverflowBytes(maxBackendBytes int64) int64 { + allow := maxBackendBytes * 10 / 100 + if allow > MaxAllowedOverflowQuotaBytes { + allow = MaxAllowedOverflowQuotaBytes + } + return allow +} diff --git a/tools/benchmark/cmd/put.go b/tools/benchmark/cmd/put.go index 5e3943ff22d..58f0af3bbd1 100644 --- a/tools/benchmark/cmd/put.go +++ b/tools/benchmark/cmd/put.go @@ -42,8 +42,9 @@ var putCmd = &cobra.Command{ } var ( - keySize int - valSize int + keySize int + valSize int + deltaValSize int putTotal int putRate int @@ -61,6 +62,7 @@ func init() { RootCmd.AddCommand(putCmd) putCmd.Flags().IntVar(&keySize, "key-size", 8, "Key size of put request") putCmd.Flags().IntVar(&valSize, "val-size", 8, "Value size of put request") + putCmd.Flags().IntVar(&deltaValSize, "delta-val-size", 0, "Delta of value size of put request") putCmd.Flags().IntVar(&putRate, "rate", 0, "Maximum puts per second (0 is no limit)") putCmd.Flags().IntVar(&putTotal, "total", 10000, "Total number of put requests") @@ -83,7 +85,7 @@ func putFunc(cmd *cobra.Command, _ []string) { } limit := rate.NewLimiter(rate.Limit(putRate), 1) clients := mustCreateClients(totalClients, totalConns) - k, v := make([]byte, keySize), string(mustRandBytes(valSize)) + k, v := make([]byte, keySize), string(mustRandBytes(valSize+deltaValSize)) bar = pb.New(putTotal) bar.Start() @@ -104,6 +106,7 @@ func putFunc(cmd *cobra.Command, _ []string) { }(clients[i]) } + baseValSize := valSize - deltaValSize go func() { for i := 0; i < putTotal; i++ { if seqKeys { @@ -111,7 +114,11 @@ func putFunc(cmd *cobra.Command, _ []string) { } else { binary.PutVarint(k, int64(rand.Intn(keySpaceSize))) } - requests <- v3.OpPut(string(k), v) + deltaV := v + if deltaValSize > 0 { + deltaV = v[:baseValSize+rand.Intn(2*deltaValSize)] + } + requests <- v3.OpPut(string(k), deltaV) } close(requests) }() diff --git a/tools/benchmark/cmd/root.go b/tools/benchmark/cmd/root.go index 0a22e031b95..e7c3d888ada 100644 --- a/tools/benchmark/cmd/root.go +++ b/tools/benchmark/cmd/root.go @@ -15,6 +15,7 @@ package cmd import ( + "math/rand" "sync" "time" @@ -58,6 +59,8 @@ var ( ) func init() { + rand.Seed(time.Now().UnixNano()) + RootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints") RootCmd.PersistentFlags().UintVar(&totalConns, "conns", 1, "Total number of gRPC connections") RootCmd.PersistentFlags().UintVar(&totalClients, "clients", 1, "Total number of gRPC clients") diff --git a/tools/benchmark/cmd/txn_put.go b/tools/benchmark/cmd/txn_put.go index 12990c6cbc1..a55f5bf4347 100644 --- a/tools/benchmark/cmd/txn_put.go +++ b/tools/benchmark/cmd/txn_put.go @@ -19,6 +19,7 @@ import ( "encoding/binary" "fmt" "math" + "math/rand" "os" "time" @@ -48,6 +49,7 @@ func init() { RootCmd.AddCommand(txnPutCmd) txnPutCmd.Flags().IntVar(&keySize, "key-size", 8, "Key size of txn put") txnPutCmd.Flags().IntVar(&valSize, "val-size", 8, "Value size of txn put") + txnPutCmd.Flags().IntVar(&deltaValSize, "delta-val-size", 0, "Delta of value size of put request") txnPutCmd.Flags().IntVar(&txnPutOpsPerTxn, "txn-ops", 1, "Number of puts per txn") txnPutCmd.Flags().IntVar(&txnPutRate, "rate", 0, "Maximum txns per second (0 is no limit)") @@ -73,7 +75,7 @@ func txnPutFunc(_ *cobra.Command, _ []string) { } limit := rate.NewLimiter(rate.Limit(txnPutRate), 1) clients := mustCreateClients(totalClients, totalConns) - k, v := make([]byte, keySize), string(mustRandBytes(valSize)) + k, v := make([]byte, keySize), string(mustRandBytes(valSize+deltaValSize)) bar = pb.New(txnPutTotal) bar.Start() @@ -93,12 +95,18 @@ func txnPutFunc(_ *cobra.Command, _ []string) { }(clients[i]) } + baseValSize := valSize - deltaValSize go func() { for i := 0; i < txnPutTotal; i++ { ops := make([]v3.Op, txnPutOpsPerTxn) for j := 0; j < txnPutOpsPerTxn; j++ { binary.PutVarint(k, int64(((i*txnPutOpsPerTxn)+j)%keySpaceSize)) - ops[j] = v3.OpPut(string(k), v) + + deltaV := v + if deltaValSize > 0 { + deltaV = v[:baseValSize+rand.Intn(2*deltaValSize)] + } + ops[j] = v3.OpPut(string(k), deltaV) } requests <- ops }