From 4db8df677c618b462145fce7cb926c072a0ce932 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Mon, 14 Aug 2023 10:08:23 +0000 Subject: [PATCH] feature: add new compactor based revision count What would you like to be added? Add new compactor based revision count, instead of fixed interval time. In order to make it happen, the mvcc store needs to export `CompactNotify` function to notify the compactor that configured number of write transactions have occured since previsious compaction. The new compactor can get the revision change and delete out-of-date data in time, instead of waiting with fixed interval time. The underly bbolt db can reuse the free pages as soon as possible. Why is this needed? In the kubernetes cluster, for instance, argo workflow, there will be batch requests to create pods , and then there are also a lot of pod status's PATCH requests, especially when the pod has more than 3 containers. If the burst requests increase the db size in short time, it will be easy to exceed the max quota size. And then the cluster admin get involved to defrag, which may casue long downtime. So, we hope the ETCD can delete the out-of-date data as soon as possible and slow down the grow of total db size. Currently, both revision and periodic are based on time. It's not easy to use fixed interval time to face the unexpected burst update requests. The new compactor based on revision count can make the admin life easier. For instance, let's say that average of object size is 50 KiB. The new compactor will compact based on 10,000 revisions. It's like that ETCD can compact after new 500 MiB data in, no matter how long ETCD takes to get new 10,000 revisions. It can handle the burst update requests well. There are some test results: * Fixed value size: 10 KiB, Update Rate: 100/s, Total key space: 3,000 ``` enchmark put --rate=100 --total=300000 --compact-interval=0 \ --key-space-size=3000 --key-size=256 --val-size=10240 ``` | Compactor | DB Total Size | DB InUse Size | | -- | -- | -- | | Revision(5min,retension:10000) | 570 MiB | 208 MiB | | Periodic(1m) | 232 MiB | 165 MiB | | Periodic(30s) | 151 MiB | 127 MiB | | NewRevision(retension:10000) | 195 MiB | 187 MiB | * Random value size: [9 KiB, 11 KiB], Update Rate: 150/s, Total key space: 3,000 ``` bnchmark put --rate=150 --total=300000 --compact-interval=0 \ --key-space-size=3000 --key-size=256 --val-size=10240 \ --delta-val-size=1024 ``` | Compactor | DB Total Size | DB InUse Size | | -- | -- | -- | | Revision(5min,retension:10000) | 718 MiB | 554 MiB | | Periodic(1m) | 297 MiB | 246 MiB | | Periodic(30s) | 185 MiB | 146 MiB | | NewRevision(retension:10000) | 186 MiB | 178 MiB | * Random value size: [6 KiB, 14 KiB], Update Rate: 200/s, Total key space: 3,000 ``` bnchmark put --rate=200 --total=300000 --compact-interval=0 \ --key-space-size=3000 --key-size=256 --val-size=10240 \ --delta-val-size=4096 ``` | Compactor | DB Total Size | DB InUse Size | | -- | -- | -- | | Revision(5min,retension:10000) | 874 MiB | 221 MiB | | Periodic(1m) | 357 MiB | 260 MiB | | Periodic(30s) | 215 MiB | 151 MiB | | NewRevision(retension:10000) | 182 MiB | 176 MiB | For the burst requests, we needs to use short periodic interval. Otherwise, the total size will be large. I think the new compactor can handle it well. Additional Change: Currently, the quota system only checks DB total size. However, there could be a lot of free pages which can be reused to upcoming requests. Based on this proposal, I also want to extend current quota system with DB's InUse size. If the InUse size is less than max quota size, we should allow requests to update. Since the bbolt might be resized if there is no available continuous pages, we should setup a hard limit for the overflow, like 1 GiB. ```diff // Quota represents an arbitrary quota against arbitrary requests. Each request @@ -130,7 +134,17 @@ func (b *BackendQuota) Available(v interface{}) bool { return true } // TODO: maybe optimize Backend.Size() - return b.be.Size()+int64(cost) < b.maxBackendBytes + + // Since the compact comes with allocatable pages, we should check the + // SizeInUse first. If there is no continuous pages for key/value and + // the boltdb continues to resize, it should not increase more than 1 + // GiB. It's hard limitation. + // + // TODO: It should be enabled by flag. + if b.be.Size()+int64(cost)-b.maxBackendBytes >= maxAllowedOverflowBytes(b.maxBackendBytes) { + return false + } + return b.be.SizeInUse()+int64(cost) < b.maxBackendBytes } ``` And it's likely to disable NOSPACE alarm if the compact can get much more free pages. It can reduce downtime. Signed-off-by: Wei Fu --- server/embed/config.go | 10 +- server/embed/etcd.go | 2 + .../etcdserver/api/v3compactor/compactor.go | 19 ++- .../api/v3compactor/revision_threshold.go | 112 ++++++++++++++++++ server/etcdserver/server.go | 11 +- server/storage/mvcc/kvstore.go | 34 +++++- server/storage/mvcc/kvstore_txn.go | 2 + server/storage/quota.go | 24 +++- tools/benchmark/cmd/put.go | 15 ++- tools/benchmark/cmd/root.go | 3 + tools/benchmark/cmd/txn_put.go | 12 +- 11 files changed, 228 insertions(+), 16 deletions(-) create mode 100644 server/etcdserver/api/v3compactor/revision_threshold.go diff --git a/server/embed/config.go b/server/embed/config.go index 0b79ce4bc25..5f1e6ca7e5d 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -139,6 +139,14 @@ var ( // revision 5000 when the current revision is 6000. // This runs every 5-minute if enough of logs have proceeded. CompactorModeRevision = v3compactor.ModeRevision + + // CompactorModeRevisionThreshold is revision-change-based compaction + // mode for "Config.AutoCompactionMode" field. + // If "AutoCompactionMode" is CompactorModeRevisionThreshold and + // "AutoCompactionRetention" is "10000", it compacts log on revision + // 10000 when the current revision is 20000. This runs if there are + // more than 10000 revisions have occurred since previous compaction. + CompactorModeRevisionThreshold = v3compactor.ModeRevisionThreshold ) func init() { @@ -775,7 +783,7 @@ func (cfg *Config) Validate() error { } switch cfg.AutoCompactionMode { - case CompactorModeRevision, CompactorModePeriodic: + case CompactorModeRevision, CompactorModePeriodic, CompactorModeRevisionThreshold: case "": return errors.New("undefined auto-compaction-mode") default: diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 594e11ec385..bf438ee78d2 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -888,6 +888,8 @@ func parseCompactionRetention(mode, retention string) (ret time.Duration, err er ret = time.Duration(int64(h)) case CompactorModePeriodic: ret = time.Duration(int64(h)) * time.Hour + case CompactorModeRevisionThreshold: + ret = time.Duration(int64(h)) case "": return 0, errors.New("--auto-compaction-mode is undefined") } diff --git a/server/etcdserver/api/v3compactor/compactor.go b/server/etcdserver/api/v3compactor/compactor.go index e352670c12b..f4ade944619 100644 --- a/server/etcdserver/api/v3compactor/compactor.go +++ b/server/etcdserver/api/v3compactor/compactor.go @@ -26,8 +26,9 @@ import ( ) const ( - ModePeriodic = "periodic" - ModeRevision = "revision" + ModePeriodic = "periodic" + ModeRevision = "revision" + ModeRevisionThreshold = "revision-threshold" ) // Compactor purges old log from the storage periodically. @@ -47,6 +48,11 @@ type Compactable interface { Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) } +type KvGetter interface { + RevGetter + CompactNotify() chan struct{} +} + type RevGetter interface { Rev() int64 } @@ -56,17 +62,20 @@ func New( lg *zap.Logger, mode string, retention time.Duration, - rg RevGetter, + kg KvGetter, c Compactable, ) (Compactor, error) { if lg == nil { lg = zap.NewNop() } + switch mode { case ModePeriodic: - return newPeriodic(lg, clockwork.NewRealClock(), retention, rg, c), nil + return newPeriodic(lg, clockwork.NewRealClock(), retention, kg, c), nil case ModeRevision: - return newRevision(lg, clockwork.NewRealClock(), int64(retention), rg, c), nil + return newRevision(lg, clockwork.NewRealClock(), int64(retention), kg, c), nil + case ModeRevisionThreshold: + return newRevisionThreshold(lg, clockwork.NewRealClock(), kg, c), nil default: return nil, fmt.Errorf("unsupported compaction mode %s", mode) } diff --git a/server/etcdserver/api/v3compactor/revision_threshold.go b/server/etcdserver/api/v3compactor/revision_threshold.go new file mode 100644 index 00000000000..356577ede00 --- /dev/null +++ b/server/etcdserver/api/v3compactor/revision_threshold.go @@ -0,0 +1,112 @@ +// Copyright 2023 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v3compactor + +import ( + "context" + "sync" + "time" + + "github.com/jonboulle/clockwork" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/server/v3/storage/mvcc" + "go.uber.org/zap" +) + +type RevisionThreshold struct { + lg *zap.Logger + + clock clockwork.Clock + + kv KvGetter + c Compactable + + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + paused bool +} + +func newRevisionThreshold(lg *zap.Logger, clock clockwork.Clock, kv KvGetter, c Compactable) *RevisionThreshold { + rc := &RevisionThreshold{ + lg: lg, + clock: clock, + kv: kv, + c: c, + } + rc.ctx, rc.cancel = context.WithCancel(context.Background()) + return rc +} + +func (rc *RevisionThreshold) Run() { + go func() { + for { + select { + case <-rc.ctx.Done(): + return + case <-rc.kv.CompactNotify(): + rc.mu.Lock() + p := rc.paused + rc.mu.Unlock() + if p { + continue + } + } + + rev := rc.kv.Rev() + + now := time.Now() + rc.lg.Info( + "starting auto revision compaction", + zap.Int64("revision", rev), + ) + + _, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev}) + if err == nil || err == mvcc.ErrCompacted { + rc.lg.Info( + "completed auto revision compaction", + zap.Int64("revision", rev), + zap.Duration("took", time.Since(now)), + ) + } else { + rc.lg.Warn( + "failed auto revision compaction", + zap.Int64("revision", rev), + zap.Error(err), + ) + } + } + }() +} + +// Stop stops revision-based compactor. +func (rc *RevisionThreshold) Stop() { + rc.cancel() +} + +// Pause pauses revision-based compactor. +func (rc *RevisionThreshold) Pause() { + rc.mu.Lock() + rc.paused = true + rc.mu.Unlock() +} + +// Resume resumes revision-based compactor. +func (rc *RevisionThreshold) Resume() { + rc.mu.Lock() + rc.paused = false + rc.mu.Unlock() +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 4a2128104c9..b43db86cf5c 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -373,6 +373,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { CompactionBatchLimit: cfg.CompactionBatchLimit, CompactionSleepInterval: cfg.CompactionSleepInterval, } + if cfg.AutoCompactionMode == v3compactor.ModeRevisionThreshold { + mvccStoreConfig.CompactionNotifyThreshold = int64(cfg.AutoCompactionRetention) + } + srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig) srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage()) @@ -387,7 +391,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } }() if num := cfg.AutoCompactionRetention; num != 0 { - srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv) + srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, any(srv.kv).(kvGetter), srv) if err != nil { return nil, err } @@ -2479,3 +2483,8 @@ func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { func (s *EtcdServer) CorruptionChecker() CorruptionChecker { return s.corruptionChecker } + +type kvGetter interface { + CompactNotify() chan struct{} + Rev() int64 +} diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index eeb82c68bc9..1e7df6bb664 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -53,6 +53,10 @@ var minimumBatchInterval = 10 * time.Millisecond type StoreConfig struct { CompactionBatchLimit int CompactionSleepInterval time.Duration + // CompactionNotifyThreshold is used to guarantee that a notification + // is sent only after configured number of write transactions have + // occured since previsious compaction. + CompactionNotifyThreshold int64 } type store struct { @@ -77,6 +81,9 @@ type store struct { currentRev int64 // compactMainRev is the main revision of the last compaction. compactMainRev int64 + // compactNotifyCh is used to notify the compactor that it's time to + // compact. + compactNotifyCh chan struct{} fifoSched schedule.Scheduler @@ -105,8 +112,9 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi le: le, - currentRev: 1, - compactMainRev: -1, + currentRev: 1, + compactMainRev: -1, + compactNotifyCh: make(chan struct{}, 1), fifoSched: schedule.NewFIFOScheduler(lg), @@ -488,6 +496,9 @@ func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, k func (s *store) Close() error { close(s.stopc) + if s.compactNotifyCh != nil { + close(s.compactNotifyCh) + } s.fifoSched.Stop() return nil } @@ -539,3 +550,22 @@ func isTombstone(b []byte) bool { func (s *store) HashStorage() HashStorage { return s.hashes } + +func (s *store) CompactNotify() chan struct{} { + return s.compactNotifyCh +} + +func (s *store) doCompactNotify() { + threshold := s.cfg.CompactionNotifyThreshold + + if threshold <= 0 { + return + } + + if s.currentRev-s.compactMainRev > threshold { + select { + case s.compactNotifyCh <- struct{}{}: + default: + } + } +} diff --git a/server/storage/mvcc/kvstore_txn.go b/server/storage/mvcc/kvstore_txn.go index b93fcbe64da..ec5565e28aa 100644 --- a/server/storage/mvcc/kvstore_txn.go +++ b/server/storage/mvcc/kvstore_txn.go @@ -180,6 +180,8 @@ func (tw *storeTxnWrite) End() { // hold revMu lock to prevent new read txns from opening until writeback. tw.s.revMu.Lock() tw.s.currentRev++ + + tw.s.doCompactNotify() } tw.tx.Unlock() if len(tw.changes) != 0 { diff --git a/server/storage/quota.go b/server/storage/quota.go index f24ca987cb4..a2007069db9 100644 --- a/server/storage/quota.go +++ b/server/storage/quota.go @@ -31,6 +31,10 @@ const ( // MaxQuotaBytes is the maximum number of bytes suggested for a backend // quota. A larger quota may lead to degraded performance. MaxQuotaBytes = int64(8 * 1024 * 1024 * 1024) // 8GB + + // MaxAllowedOverflowQuotaBytes is the number of bytes the backend size + // can be overflow after exceeding the space quota. + MaxAllowedOverflowQuotaBytes = int64(1024 * 1024 * 1024) // 1GB ) // Quota represents an arbitrary quota against arbitrary requests. Each request @@ -130,7 +134,17 @@ func (b *BackendQuota) Available(v interface{}) bool { return true } // TODO: maybe optimize Backend.Size() - return b.be.Size()+int64(cost) < b.maxBackendBytes + + // Since the compact comes with allocatable pages, we should check the + // SizeInUse first. If there is no continuous pages for key/value and + // the boltdb continues to resize, it should not increase more than 1 + // GiB. It's hard limitation. + // + // TODO: It should be enabled by flag. + if b.be.Size()+int64(cost)-b.maxBackendBytes >= maxAllowedOverflowBytes(b.maxBackendBytes) { + return false + } + return b.be.SizeInUse()+int64(cost) < b.maxBackendBytes } func (b *BackendQuota) Cost(v interface{}) int { @@ -174,3 +188,11 @@ func costTxn(r *pb.TxnRequest) int { func (b *BackendQuota) Remaining() int64 { return b.maxBackendBytes - b.be.Size() } + +func maxAllowedOverflowBytes(maxBackendBytes int64) int64 { + allow := maxBackendBytes * 10 / 100 + if allow > MaxAllowedOverflowQuotaBytes { + allow = MaxAllowedOverflowQuotaBytes + } + return allow +} diff --git a/tools/benchmark/cmd/put.go b/tools/benchmark/cmd/put.go index 5e3943ff22d..58f0af3bbd1 100644 --- a/tools/benchmark/cmd/put.go +++ b/tools/benchmark/cmd/put.go @@ -42,8 +42,9 @@ var putCmd = &cobra.Command{ } var ( - keySize int - valSize int + keySize int + valSize int + deltaValSize int putTotal int putRate int @@ -61,6 +62,7 @@ func init() { RootCmd.AddCommand(putCmd) putCmd.Flags().IntVar(&keySize, "key-size", 8, "Key size of put request") putCmd.Flags().IntVar(&valSize, "val-size", 8, "Value size of put request") + putCmd.Flags().IntVar(&deltaValSize, "delta-val-size", 0, "Delta of value size of put request") putCmd.Flags().IntVar(&putRate, "rate", 0, "Maximum puts per second (0 is no limit)") putCmd.Flags().IntVar(&putTotal, "total", 10000, "Total number of put requests") @@ -83,7 +85,7 @@ func putFunc(cmd *cobra.Command, _ []string) { } limit := rate.NewLimiter(rate.Limit(putRate), 1) clients := mustCreateClients(totalClients, totalConns) - k, v := make([]byte, keySize), string(mustRandBytes(valSize)) + k, v := make([]byte, keySize), string(mustRandBytes(valSize+deltaValSize)) bar = pb.New(putTotal) bar.Start() @@ -104,6 +106,7 @@ func putFunc(cmd *cobra.Command, _ []string) { }(clients[i]) } + baseValSize := valSize - deltaValSize go func() { for i := 0; i < putTotal; i++ { if seqKeys { @@ -111,7 +114,11 @@ func putFunc(cmd *cobra.Command, _ []string) { } else { binary.PutVarint(k, int64(rand.Intn(keySpaceSize))) } - requests <- v3.OpPut(string(k), v) + deltaV := v + if deltaValSize > 0 { + deltaV = v[:baseValSize+rand.Intn(2*deltaValSize)] + } + requests <- v3.OpPut(string(k), deltaV) } close(requests) }() diff --git a/tools/benchmark/cmd/root.go b/tools/benchmark/cmd/root.go index 0a22e031b95..e7c3d888ada 100644 --- a/tools/benchmark/cmd/root.go +++ b/tools/benchmark/cmd/root.go @@ -15,6 +15,7 @@ package cmd import ( + "math/rand" "sync" "time" @@ -58,6 +59,8 @@ var ( ) func init() { + rand.Seed(time.Now().UnixNano()) + RootCmd.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints") RootCmd.PersistentFlags().UintVar(&totalConns, "conns", 1, "Total number of gRPC connections") RootCmd.PersistentFlags().UintVar(&totalClients, "clients", 1, "Total number of gRPC clients") diff --git a/tools/benchmark/cmd/txn_put.go b/tools/benchmark/cmd/txn_put.go index 12990c6cbc1..a55f5bf4347 100644 --- a/tools/benchmark/cmd/txn_put.go +++ b/tools/benchmark/cmd/txn_put.go @@ -19,6 +19,7 @@ import ( "encoding/binary" "fmt" "math" + "math/rand" "os" "time" @@ -48,6 +49,7 @@ func init() { RootCmd.AddCommand(txnPutCmd) txnPutCmd.Flags().IntVar(&keySize, "key-size", 8, "Key size of txn put") txnPutCmd.Flags().IntVar(&valSize, "val-size", 8, "Value size of txn put") + txnPutCmd.Flags().IntVar(&deltaValSize, "delta-val-size", 0, "Delta of value size of put request") txnPutCmd.Flags().IntVar(&txnPutOpsPerTxn, "txn-ops", 1, "Number of puts per txn") txnPutCmd.Flags().IntVar(&txnPutRate, "rate", 0, "Maximum txns per second (0 is no limit)") @@ -73,7 +75,7 @@ func txnPutFunc(_ *cobra.Command, _ []string) { } limit := rate.NewLimiter(rate.Limit(txnPutRate), 1) clients := mustCreateClients(totalClients, totalConns) - k, v := make([]byte, keySize), string(mustRandBytes(valSize)) + k, v := make([]byte, keySize), string(mustRandBytes(valSize+deltaValSize)) bar = pb.New(txnPutTotal) bar.Start() @@ -93,12 +95,18 @@ func txnPutFunc(_ *cobra.Command, _ []string) { }(clients[i]) } + baseValSize := valSize - deltaValSize go func() { for i := 0; i < txnPutTotal; i++ { ops := make([]v3.Op, txnPutOpsPerTxn) for j := 0; j < txnPutOpsPerTxn; j++ { binary.PutVarint(k, int64(((i*txnPutOpsPerTxn)+j)%keySpaceSize)) - ops[j] = v3.OpPut(string(k), v) + + deltaV := v + if deltaValSize > 0 { + deltaV = v[:baseValSize+rand.Intn(2*deltaValSize)] + } + ops[j] = v3.OpPut(string(k), deltaV) } requests <- ops }