From 8fc7fd1a06dce0bd4d628b42e84cfbbc3d41b269 Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Mon, 11 Oct 2021 00:05:01 -0700 Subject: [PATCH] cherry-pick from #12343 --- client/v3/kv.go | 21 +++ client/v3/leasing/cache.go | 55 ++++++++ client/v3/leasing/kv.go | 81 +++++++++++ client/v3/mock/mockserver/mockserver.go | 4 + client/v3/namespace/kv.go | 22 +++ client/v3/op.go | 32 ++++- client/v3/ordering/kv.go | 26 ++++ client/v3/retry.go | 4 + etcdctl/ctlv3/command/getstream_command.go | 127 ++++++++++++++++++ etcdctl/ctlv3/command/printer.go | 2 + etcdctl/ctlv3/command/printer_fields.go | 8 ++ etcdctl/ctlv3/command/printer_simple.go | 6 + etcdctl/ctlv3/ctl.go | 1 + server/etcdserver/api/v3rpc/key.go | 63 ++++++++- server/etcdserver/apply.go | 100 ++++++++++++++ server/etcdserver/apply_auth.go | 7 + server/etcdserver/corrupt.go | 4 + server/etcdserver/util.go | 11 ++ server/etcdserver/v3_server.go | 43 ++++++ .../grpcproxy/adapter/kv_client_adapter.go | 34 +++++ server/proxy/grpcproxy/kv.go | 94 +++++++++++++ server/storage/mvcc/kv.go | 3 + server/storage/mvcc/kv_view.go | 6 + server/storage/mvcc/kvstore_txn.go | 99 ++++++++++++++ server/storage/mvcc/metrics_txn.go | 10 +- 25 files changed, 859 insertions(+), 4 deletions(-) create mode 100644 etcdctl/ctlv3/command/getstream_command.go diff --git a/client/v3/kv.go b/client/v3/kv.go index 5e9fb7d45896..98b0231cb0ad 100644 --- a/client/v3/kv.go +++ b/client/v3/kv.go @@ -26,6 +26,7 @@ type ( CompactResponse pb.CompactionResponse PutResponse pb.PutResponse GetResponse pb.RangeResponse + GetStreamResponse pb.RangeResponse DeleteResponse pb.DeleteRangeResponse TxnResponse pb.TxnResponse ) @@ -47,6 +48,8 @@ type KV interface { // When passed WithSort(), the keys will be sorted. Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) + GetStream(ctx context.Context, key string, opts ...OpOption) (*GetStreamResponse, error) + // Delete deletes a key, or optionally using WithRange(end), [key, end). Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) @@ -67,12 +70,14 @@ type KV interface { type OpResponse struct { put *PutResponse get *GetResponse + getStream *GetStreamResponse del *DeleteResponse txn *TxnResponse } func (op OpResponse) Put() *PutResponse { return op.put } func (op OpResponse) Get() *GetResponse { return op.get } +func (op OpResponse) GetStream() *GetStreamResponse { return op.getStream } func (op OpResponse) Del() *DeleteResponse { return op.del } func (op OpResponse) Txn() *TxnResponse { return op.txn } @@ -82,6 +87,9 @@ func (resp *PutResponse) OpResponse() OpResponse { func (resp *GetResponse) OpResponse() OpResponse { return OpResponse{get: resp} } +func (resp *GetStreamResponse) OpResponse() OpResponse { + return OpResponse{getStream: resp} +} func (resp *DeleteResponse) OpResponse() OpResponse { return OpResponse{del: resp} } @@ -120,6 +128,11 @@ func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetRespon return r.get, toErr(ctx, err) } +func (kv *kv) GetStream(ctx context.Context, key string, opts ...OpOption) (*GetStreamResponse, error) { + r, err := kv.Do(ctx, OpGetStream(key, opts...)) + return r.getStream, toErr(ctx, err) +} + func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) { r, err := kv.Do(ctx, OpDelete(key, opts...)) return r.del, toErr(ctx, err) @@ -150,6 +163,14 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { if err == nil { return OpResponse{get: (*GetResponse)(resp)}, nil } + case tRangeStream: + var rangeStreamClient pb.KV_RangeStreamClient + var resp *pb.RangeResponse + rangeStreamClient, err = kv.openRangeStreamClient(ctx, op.toRangeStreamRequest(), kv.callOpts...) + resp, err = kv.serveRangeStream(ctx, rangeStreamClient) + if err == nil { + return OpResponse{getStream: (*GetStreamResponse)(resp)}, nil + } case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease} diff --git a/client/v3/leasing/cache.go b/client/v3/leasing/cache.go index 214ee2fc196d..d37400dd4d13 100644 --- a/client/v3/leasing/cache.go +++ b/client/v3/leasing/cache.go @@ -136,6 +136,18 @@ func (lc *leaseCache) Add(key string, resp *v3.GetResponse, op v3.Op) *v3.GetRes return ret } +func (lc *leaseCache) AddStream(key string, resp *v3.GetStreamResponse, op v3.Op) *v3.GetStreamResponse { + lk := &leaseKey{(*v3.GetResponse)(resp), resp.Header.Revision, closedCh} + lc.mu.Lock() + if lc.header == nil || lc.header.Revision < resp.Header.Revision { + lc.header = resp.Header + } + lc.entries[key] = lk + ret := lk.getStream(op) + lc.mu.Unlock() + return ret +} + func (lc *leaseCache) Update(key, val []byte, respHeader *v3pb.ResponseHeader) { li := lc.entries[string(key)] if li == nil { @@ -216,6 +228,28 @@ func (lc *leaseCache) Get(ctx context.Context, op v3.Op) (*v3.GetResponse, bool) return ret, true } + +func (lc *leaseCache) GetStream(ctx context.Context, op v3.Op) (*v3.GetStreamResponse, bool) { + if isBadOp(op) { + return nil, false + } + key := string(op.KeyBytes()) + li, wc := lc.notify(key) + if li == nil { + return nil, true + } + select { + case <-wc: + case <-ctx.Done(): + return nil, true + } + lc.mu.RLock() + lk := *li + ret := lk.getStream(op) + lc.mu.RUnlock() + return ret, true +} + func (lk *leaseKey) get(op v3.Op) *v3.GetResponse { ret := *lk.response ret.Header = copyHeader(ret.Header) @@ -239,6 +273,27 @@ func (lk *leaseKey) get(op v3.Op) *v3.GetResponse { return &ret } +func (lk *leaseKey) getStream(op v3.Op) *v3.GetStreamResponse { + ret := *lk.response + ret.Header = copyHeader(ret.Header) + empty := len(ret.Kvs) == 0 + if empty { + ret.Kvs = nil + } else { + kv := *ret.Kvs[0] + kv.Key = make([]byte, len(kv.Key)) + copy(kv.Key, ret.Kvs[0].Key) + if !op.IsKeysOnly() { + kv.Value = make([]byte, len(kv.Value)) + copy(kv.Value, ret.Kvs[0].Value) + } + ret.Kvs = []*mvccpb.KeyValue{&kv} + } + + retNew := (v3.GetStreamResponse)(ret) + return &retNew +} + func (lc *leaseCache) notify(key string) (*leaseKey, <-chan struct{}) { lc.mu.RLock() defer lc.mu.RUnlock() diff --git a/client/v3/leasing/kv.go b/client/v3/leasing/kv.go index f0cded20feaf..471418afd80e 100644 --- a/client/v3/leasing/kv.go +++ b/client/v3/leasing/kv.go @@ -86,6 +86,10 @@ func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) return lkv.get(ctx, v3.OpGet(key, opts...)) } +func (lkv *leasingKV) GetStream(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetStreamResponse, error) { + return lkv.getStream(ctx, v3.OpGetStream(key, opts...)) +} + func (lkv *leasingKV) Put(ctx context.Context, key, val string, opts ...v3.OpOption) (*v3.PutResponse, error) { return lkv.put(ctx, v3.OpPut(key, val, opts...)) } @@ -99,6 +103,9 @@ func (lkv *leasingKV) Do(ctx context.Context, op v3.Op) (v3.OpResponse, error) { case op.IsGet(): resp, err := lkv.get(ctx, op) return resp.OpResponse(), err + case op.IsGetStream(): + resp, err := lkv.getStream(ctx, op) + return resp.OpResponse(), err case op.IsPut(): resp, err := lkv.put(ctx, op) return resp.OpResponse(), err @@ -331,6 +338,80 @@ func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error return getResp, nil } +func (lkv *leasingKV) acquireStream(ctx context.Context, key string, op v3.Op) (*v3.TxnResponse, error) { + for ctx.Err() == nil { + if err := lkv.waitSession(ctx); err != nil { + return nil, err + } + lcmp := v3.Cmp{Key: []byte(key), Target: pb.Compare_LEASE} + resp, err := lkv.kv.Txn(ctx).If( + v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0), + v3.Compare(lcmp, "=", 0)). + Then( + op, + v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.leaseID()))). + Else( + op, + v3.OpGetStream(lkv.pfx+key), + ).Commit() + if err == nil { + if !resp.Succeeded { + kvs := resp.Responses[1].GetResponseRange().Kvs + // if txn failed since already owner, lease is acquired + resp.Succeeded = len(kvs) > 0 && v3.LeaseID(kvs[0].Lease) == lkv.leaseID() + } + return resp, nil + } + // retry if transient error + if _, ok := err.(rpctypes.EtcdError); ok { + return nil, err + } + if ev, ok := status.FromError(err); ok && ev.Code() != codes.Unavailable { + return nil, err + } + } + return nil, ctx.Err() +} + +func (lkv *leasingKV) getStream(ctx context.Context, op v3.Op) (*v3.GetStreamResponse, error) { + do := func() (*v3.GetStreamResponse, error) { + r, err := lkv.kv.Do(ctx, op) + return r.GetStream(), err + } + if !lkv.readySession() { + return do() + } + + if resp, ok := lkv.leases.GetStream(ctx, op); resp != nil { + return resp, nil + } else if !ok || op.IsSerializable() { + // must be handled by server or can skip linearization + return do() + } + + key := string(op.KeyBytes()) + if !lkv.leases.MayAcquire(key) { + resp, err := lkv.kv.Do(ctx, op) + return resp.GetStream(), err + } + + resp, err := lkv.acquireStream(ctx, key, v3.OpGetStream(key)) + if err != nil { + return nil, err + } + getResp := (*v3.GetStreamResponse)(resp.Responses[0].GetResponseRange()) + getResp.Header = resp.Header + if resp.Succeeded { + getResp = lkv.leases.AddStream(key, getResp, op) + lkv.wg.Add(1) + go func() { + defer lkv.wg.Done() + lkv.monitorLease(ctx, key, resp.Header.Revision) + }() + } + return getResp, nil +} + func (lkv *leasingKV) deleteRangeRPC(ctx context.Context, maxLeaseRev int64, key, end string) (*v3.DeleteResponse, error) { lkey, lend := lkv.pfx+key, lkv.pfx+end resp, err := lkv.kv.Txn(ctx).If( diff --git a/client/v3/mock/mockserver/mockserver.go b/client/v3/mock/mockserver/mockserver.go index 21a8b013b405..801352912962 100644 --- a/client/v3/mock/mockserver/mockserver.go +++ b/client/v3/mock/mockserver/mockserver.go @@ -171,6 +171,10 @@ func (m *mockKVServer) Range(context.Context, *pb.RangeRequest) (*pb.RangeRespon return &pb.RangeResponse{}, nil } +func (m *mockKVServer) RangeStream(r *pb.RangeRequest, rss pb.KV_RangeStreamServer) error { + return nil +} + func (m *mockKVServer) Put(context.Context, *pb.PutRequest) (*pb.PutResponse, error) { return &pb.PutResponse{}, nil } diff --git a/client/v3/namespace/kv.go b/client/v3/namespace/kv.go index f745225cacd4..a6d5e6a7f44e 100644 --- a/client/v3/namespace/kv.go +++ b/client/v3/namespace/kv.go @@ -60,6 +60,20 @@ func (kv *kvPrefix) Get(ctx context.Context, key string, opts ...clientv3.OpOpti return get, nil } + +func (kv *kvPrefix) GetStream(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetStreamResponse, error) { + if len(key) == 0 { + return nil, rpctypes.ErrEmptyKey + } + r, err := kv.KV.Do(ctx, kv.prefixOp(clientv3.OpGetStream(key, opts...))) + if err != nil { + return nil, err + } + get := r.GetStream() + kv.unprefixGetStreamResponse(get) + return get, nil +} + func (kv *kvPrefix) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { if len(key) == 0 && !(clientv3.IsOptsWithFromKey(opts) || clientv3.IsOptsWithPrefix(opts)) { return nil, rpctypes.ErrEmptyKey @@ -84,6 +98,8 @@ func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse switch { case r.Get() != nil: kv.unprefixGetResponse(r.Get()) + case r.GetStream() != nil: + kv.unprefixGetStreamResponse(r.GetStream()) case r.Put() != nil: kv.unprefixPutResponse(r.Put()) case r.Del() != nil: @@ -144,6 +160,12 @@ func (kv *kvPrefix) unprefixGetResponse(resp *clientv3.GetResponse) { } } +func (kv *kvPrefix) unprefixGetStreamResponse(resp *clientv3.GetStreamResponse) { + for i := range resp.Kvs { + resp.Kvs[i].Key = resp.Kvs[i].Key[len(kv.pfx):] + } +} + func (kv *kvPrefix) unprefixPutResponse(resp *clientv3.PutResponse) { if resp.PrevKv != nil { resp.PrevKv.Key = resp.PrevKv.Key[len(kv.pfx):] diff --git a/client/v3/op.go b/client/v3/op.go index e8c0c1e08c96..ca883ea38cbc 100644 --- a/client/v3/op.go +++ b/client/v3/op.go @@ -24,6 +24,7 @@ const ( tPut tDeleteRange tTxn + tRangeStream ) var noPrefixEnd = []byte{0} @@ -112,6 +113,8 @@ func (op Op) IsPut() bool { return op.t == tPut } // IsGet returns true iff the operation is a Get. func (op Op) IsGet() bool { return op.t == tRange } +func (op Op) IsGetStream() bool { return op.t == tRangeStream } + // IsDelete returns true iff the operation is a Delete. func (op Op) IsDelete() bool { return op.t == tDeleteRange } @@ -169,6 +172,21 @@ func (op Op) toRangeRequest() *pb.RangeRequest { return r } +func (op Op) toRangeStreamRequest() *pb.RangeRequest { + if op.t != tRangeStream { + panic("op.t != tRangeStream") + } + r := &pb.RangeRequest{ + Key: op.key, + RangeEnd: op.end, + Limit: op.limit, + Revision: op.rev, + Serializable: op.serializable, + KeysOnly: op.keysOnly, + } + return r +} + func (op Op) toTxnRequest() *pb.TxnRequest { thenOps := make([]*pb.RequestOp, len(op.thenOps)) for i, tOp := range op.thenOps { @@ -189,6 +207,8 @@ func (op Op) toRequestOp() *pb.RequestOp { switch op.t { case tRange: return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: op.toRangeRequest()}} + case tRangeStream: + return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: op.toRangeStreamRequest()}} case tPut: r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease} return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}} @@ -216,7 +236,7 @@ func (op Op) isWrite() bool { } return false } - return op.t != tRange + return op.t != tRange && op.t != tRangeStream } func NewOp() *Op { @@ -234,6 +254,16 @@ func OpGet(key string, opts ...OpOption) Op { return ret } +func OpGetStream(key string, opts ...OpOption) Op { + // WithPrefix and WithFromKey are not supported together + if IsOptsWithPrefix(opts) && IsOptsWithFromKey(opts) { + panic("`WithPrefix` and `WithFromKey` cannot be set at the same time, choose one") + } + ret := Op{t: tRangeStream, key: []byte(key)} + ret.applyOpts(opts) + return ret +} + // OpDelete returns "delete" operation based on given key and operation options. func OpDelete(key string, opts ...OpOption) Op { // WithPrefix and WithFromKey are not supported together diff --git a/client/v3/ordering/kv.go b/client/v3/ordering/kv.go index 7914fc4b9c52..f70e61f63d87 100644 --- a/client/v3/ordering/kv.go +++ b/client/v3/ordering/kv.go @@ -75,6 +75,32 @@ func (kv *kvOrdering) Get(ctx context.Context, key string, opts ...clientv3.OpOp } } +func (kv *kvOrdering) GetStream(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetStreamResponse, error) { + // prevRev is stored in a local variable in order to record the prevRev + // at the beginning of the Get operation, because concurrent + // access to kvOrdering could change the prevRev field in the + // middle of the Get operation. + prevRev := kv.getPrevRev() + op := clientv3.OpGetStream(key, opts...) + for { + r, err := kv.KV.Do(ctx, op) + if err != nil { + return nil, err + } + resp := r.GetStream() + if resp.Header.Revision == prevRev { + return resp, nil + } else if resp.Header.Revision > prevRev { + kv.setPrevRev(resp.Header.Revision) + return resp, nil + } + err = kv.orderViolationFunc(op, r, prevRev) + if err != nil { + return nil, err + } + } +} + func (kv *kvOrdering) Txn(ctx context.Context) clientv3.Txn { return &txnOrdering{ kv.KV.Txn(ctx), diff --git a/client/v3/retry.go b/client/v3/retry.go index 69ecc6314719..98272795272d 100644 --- a/client/v3/retry.go +++ b/client/v3/retry.go @@ -105,6 +105,10 @@ func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts . return rkv.kc.Range(ctx, in, append(opts, withRetryPolicy(repeatable))...) } +func (rkv *retryKVClient) RangeStream(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (pb.KV_RangeStreamClient, error) { + return rkv.kc.RangeStream(ctx, in, append(opts, withRetryPolicy(repeatable))...) +} + func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) { return rkv.kc.Put(ctx, in, opts...) } diff --git a/etcdctl/ctlv3/command/getstream_command.go b/etcdctl/ctlv3/command/getstream_command.go new file mode 100644 index 000000000000..d49cc13d3db1 --- /dev/null +++ b/etcdctl/ctlv3/command/getstream_command.go @@ -0,0 +1,127 @@ +// Copyright 2021 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "fmt" + + "github.com/spf13/cobra" + "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/cobrautl" +) + +var ( + getStreamConsistency string + getStreamLimit int64 + getStreamPrefix bool + getStreamFromKey bool + getStreamRev int64 + getStreamKeysOnly bool + printStreamValueOnly bool +) + +// NewGetStreamCommand returns the cobra command for "getstream". +func NewGetStreamCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "getstream [options] [range_end]", + Short: "Gets the key or a range of keys by stream", + Run: getStreamCommandFunc, + } + + cmd.Flags().StringVar(&getStreamConsistency, "consistency", "l", "Linearizable(l) or Serializable(s)") + cmd.Flags().Int64Var(&getStreamLimit, "limit", 0, "Maximum number of results") + cmd.Flags().BoolVar(&getStreamPrefix, "prefix", false, "Get keys with matching prefix") + cmd.Flags().BoolVar(&getStreamFromKey, "from-key", false, "Get keys that are greater than or equal to the given key using byte compare") + cmd.Flags().Int64Var(&getStreamRev, "rev", 0, "Specify the kv revision") + cmd.Flags().BoolVar(&getStreamKeysOnly, "keys-only", false, "Get only the keys") + cmd.Flags().BoolVar(&printStreamValueOnly, "print-value-only", false, `Only write values when using the "simple" output format`) + return cmd +} + +// getStreamCommandFunc executes the "getstream" command. +func getStreamCommandFunc(cmd *cobra.Command, args []string) { + key, opts := getGetStreamOp(args) + ctx, cancel := commandCtx(cmd) + resp, err := mustClientFromCmd(cmd).GetStream(ctx, key, opts...) + cancel() + if err != nil { + cobrautl.ExitWithError(cobrautl.ExitError, err) + } + + if printStreamValueOnly { + dp, simple := (display).(*simplePrinter) + if !simple { + cobrautl.ExitWithError(cobrautl.ExitBadArgs, fmt.Errorf("print-value-only is only for `--write-out=simple`")) + } + dp.valueOnly = true + } + display.GetStream(*resp) +} + +func getGetStreamOp(args []string) (string, []clientv3.OpOption) { + if len(args) == 0 { + cobrautl.ExitWithError(cobrautl.ExitBadArgs, fmt.Errorf("get command needs one argument as key and an optional argument as range_end")) + } + + if getStreamPrefix && getStreamFromKey { + cobrautl.ExitWithError(cobrautl.ExitBadArgs, fmt.Errorf("`--prefix` and `--from-key` cannot be set at the same time, choose one")) + } + + opts := []clientv3.OpOption{} + switch getStreamConsistency { + case "s": + opts = append(opts, clientv3.WithSerializable()) + case "l": + default: + cobrautl.ExitWithError(cobrautl.ExitBadFeature, fmt.Errorf("unknown consistency flag %q", getStreamConsistency)) + } + + key := args[0] + if len(args) > 1 { + if getStreamPrefix || getStreamFromKey { + cobrautl.ExitWithError(cobrautl.ExitBadArgs, fmt.Errorf("too many arguments, only accept one argument when `--prefix` or `--from-key` is set")) + } + opts = append(opts, clientv3.WithRange(args[1])) + } + + opts = append(opts, clientv3.WithLimit(getStreamLimit)) + if getStreamRev > 0 { + opts = append(opts, clientv3.WithRev(getStreamRev)) + } + + if getStreamPrefix { + if len(key) == 0 { + key = "\x00" + opts = append(opts, clientv3.WithFromKey()) + } else { + opts = append(opts, clientv3.WithPrefix()) + } + } + + if getStreamFromKey { + if len(key) == 0 { + key = "\x00" + } + opts = append(opts, clientv3.WithFromKey()) + } + + if getStreamKeysOnly { + opts = append(opts, clientv3.WithKeysOnly()) + } + + return key, opts +} + + diff --git a/etcdctl/ctlv3/command/printer.go b/etcdctl/ctlv3/command/printer.go index 2d31d9ec8c6b..aa059f11abad 100644 --- a/etcdctl/ctlv3/command/printer.go +++ b/etcdctl/ctlv3/command/printer.go @@ -29,6 +29,7 @@ import ( type printer interface { Del(v3.DeleteResponse) Get(v3.GetResponse) + GetStream(v3.GetStreamResponse) Put(v3.PutResponse) Txn(v3.TxnResponse) Watch(v3.WatchResponse) @@ -93,6 +94,7 @@ type printerRPC struct { func (p *printerRPC) Del(r v3.DeleteResponse) { p.p((*pb.DeleteRangeResponse)(&r)) } func (p *printerRPC) Get(r v3.GetResponse) { p.p((*pb.RangeResponse)(&r)) } +func (p *printerRPC) GetStream(r v3.GetStreamResponse) { p.p((*pb.RangeResponse)(&r)) } func (p *printerRPC) Put(r v3.PutResponse) { p.p((*pb.PutResponse)(&r)) } func (p *printerRPC) Txn(r v3.TxnResponse) { p.p((*pb.TxnResponse)(&r)) } func (p *printerRPC) Watch(r v3.WatchResponse) { p.p(&r) } diff --git a/etcdctl/ctlv3/command/printer_fields.go b/etcdctl/ctlv3/command/printer_fields.go index ca4611c735ca..92209c4160ae 100644 --- a/etcdctl/ctlv3/command/printer_fields.go +++ b/etcdctl/ctlv3/command/printer_fields.go @@ -57,6 +57,14 @@ func (p *fieldsPrinter) Get(r v3.GetResponse) { fmt.Println(`"Count" :`, r.Count) } +func (p *fieldsPrinter) GetStream(r v3.GetStreamResponse) { + p.hdr(r.Header) + for _, kv := range r.Kvs { + p.kv("", kv) + } + fmt.Println(`"Count" :`, r.Count) +} + func (p *fieldsPrinter) Put(r v3.PutResponse) { p.hdr(r.Header) if r.PrevKv != nil { diff --git a/etcdctl/ctlv3/command/printer_simple.go b/etcdctl/ctlv3/command/printer_simple.go index 14028c614457..44b59027c086 100644 --- a/etcdctl/ctlv3/command/printer_simple.go +++ b/etcdctl/ctlv3/command/printer_simple.go @@ -44,6 +44,12 @@ func (s *simplePrinter) Get(resp v3.GetResponse) { } } +func (s *simplePrinter) GetStream(resp v3.GetStreamResponse) { + for _, kv := range resp.Kvs { + printKV(s.isHex, s.valueOnly, kv) + } +} + func (s *simplePrinter) Put(r v3.PutResponse) { fmt.Println("OK") if r.PrevKv != nil { diff --git a/etcdctl/ctlv3/ctl.go b/etcdctl/ctlv3/ctl.go index 8de7a77689df..e29018828858 100644 --- a/etcdctl/ctlv3/ctl.go +++ b/etcdctl/ctlv3/ctl.go @@ -76,6 +76,7 @@ func init() { rootCmd.AddCommand( command.NewGetCommand(), + command.NewGetStreamCommand(), command.NewPutCommand(), command.NewDelCommand(), command.NewTxnCommand(), diff --git a/server/etcdserver/api/v3rpc/key.go b/server/etcdserver/api/v3rpc/key.go index d1a7ee633455..7dd656e87c37 100644 --- a/server/etcdserver/api/v3rpc/key.go +++ b/server/etcdserver/api/v3rpc/key.go @@ -17,6 +17,7 @@ package v3rpc import ( "context" + "go.uber.org/zap" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" @@ -32,10 +33,12 @@ type kvServer struct { // Txn.Success can have at most 128 operations, // and Txn.Failure can have at most 128 operations. maxTxnOps uint + + lg *zap.Logger } func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer { - return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps} + return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps, lg: s.Cfg.Logger} } func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -52,6 +55,64 @@ func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResp return resp, nil } +func (s *kvServer) RangeStream(r *pb.RangeRequest, rss pb.KV_RangeStreamServer) error { + defer func() { + if err := recover(); err != nil { + switch e := err.(type) { + case error: + s.lg.Error( + "kvServer RangeStream() panic error", zap.Error(e)) + } + } + }() + + if err := checkRangeRequest(r); err != nil { + return err + } + + respC := make(chan *pb.RangeResponse) + errC := make(chan error) + + go func() { + err := s.kv.RangeStream(rss.Context(), r, respC, errC) + if err != nil { + s.lg.Error("EtcdServer RangeStream error", zap.Error(togRPCError(err))) + } + }() + +Loop: + for { + select { + case resp := <-respC: + if resp == nil { + break Loop + } + if resp.Kvs == nil || len(resp.Kvs) == 0 { + break Loop + } + + s.hdr.fill(resp.Header) + + serr := rss.Send(resp) + if serr != nil { + if isClientCtxErr(rss.Context().Err(), serr) { + s.lg.Debug("failed to send range stream response to gRPC stream", zap.Error(serr)) + } else { + s.lg.Warn("failed to send range stream response to gRPC stream", zap.Error(serr)) + streamFailures.WithLabelValues("send", "rangeStream").Inc() + } + return nil + } + case err := <-errC: + return err + case <-rss.Context().Done(): + return rss.Context().Err() + } + } + + return nil +} + func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { if err := checkPutRequest(r); err != nil { return nil, err diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index c8ff51674219..5c10cb334a89 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -67,6 +67,7 @@ type applierV3 interface { Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) + RangeStream(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest, rspC chan *pb.RangeResponse, errC chan error) (*pb.RangeResponse, error) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) @@ -434,6 +435,105 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra return resp, nil } +func (a *applierV3backend) RangeStream(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest, rspC chan *pb.RangeResponse, errC chan error) (*pb.RangeResponse, error) { + defer func() { + if err := recover(); err != nil { + switch e := err.(type) { + case error: + a.s.lg.Error( + "applierV3backend RangeStream() panic error", zap.Error(e)) + } + } + }() + + defer close(rspC) + defer close(errC) + + trace := traceutil.Get(ctx) + + lg := a.s.lg + + resp := &pb.RangeResponse{} + resp.Header = &pb.ResponseHeader{} + streamC := make(chan *mvcc.RangeResult) + + var err error + + if txn == nil { + txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace) + defer txn.End() + } + + limit := r.Limit + ro := mvcc.RangeOptions{ + Limit: limit, + Rev: r.Revision, + } + + go func() { + err = txn.RangeStream(r.Key, mkGteRange(r.RangeEnd), ro, streamC) + if err != nil { + lg.Error("storeTxnRead RangeStream error", zap.Error(err)) + } + }() + +Loop: + for { + select { + case rr := <-streamC: + if rr == nil { + select { + case rspC <- nil: + case <-ctx.Done(): + return nil, ctx.Err() + } + break Loop + } + + if rr.Err != nil { + select { + case errC <- rr.Err: + return nil, rr.Err + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + subResp := &pb.RangeResponse{} + subResp.Header = &pb.ResponseHeader{} + + subResp.Header.Revision = rr.Rev + subResp.SubCount = int64(rr.Count) + + resp.Count += int64(rr.Count) + subResp.Count = resp.Count + subResp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs)) + for i := range rr.KVs { + if r.KeysOnly { + rr.KVs[i].Value = nil + } + subResp.Kvs[i] = &rr.KVs[i] + } + // resp TotalSize just use monitor long time range stream + resp.TotalSize += int64(proto.Size(subResp)) + // resp Header.Revision just use monitor long time range stream + resp.Header.Revision = subResp.Header.Revision + + select { + case rspC <- subResp: + case <-ctx.Done(): + return nil, ctx.Err() + } + + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + trace.Step("assemble the response") + return resp, nil +} + func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { trace := traceutil.Get(ctx) if trace.IsEmpty() { diff --git a/server/etcdserver/apply_auth.go b/server/etcdserver/apply_auth.go index bf043aa731b8..f05d8d7b58a6 100644 --- a/server/etcdserver/apply_auth.go +++ b/server/etcdserver/apply_auth.go @@ -93,6 +93,13 @@ func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Rang return aa.applierV3.Range(ctx, txn, r) } +func (aa *authApplierV3) RangeStream(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest, rspC chan *pb.RangeResponse, errC chan error) (*pb.RangeResponse, error) { + if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { + return nil, err + } + return aa.applierV3.RangeStream(ctx, txn, r, rspC, errC) +} + func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { if err := aa.as.IsDeleteRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { return nil, err diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index bc1b4674ae03..7f08481b2028 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -317,6 +317,10 @@ func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.Ra return nil, ErrCorrupt } +func (a *applierV3Corrupt) RangeStream(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest, rspC chan *pb.RangeResponse, errC chan error) (*pb.RangeResponse, error) { + return nil, ErrCorrupt +} + func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { return nil, ErrCorrupt } diff --git a/server/etcdserver/util.go b/server/etcdserver/util.go index 6ad5f0f4c6da..13b3694926d2 100644 --- a/server/etcdserver/util.go +++ b/server/etcdserver/util.go @@ -161,6 +161,17 @@ func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, warningApplyDuration ti warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only range ", resp, err) } +func warnOfExpensiveReadOnlyRangeStreamRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) { + if time.Since(now) <= warningApplyDuration { + return + } + var resp string + if !isNil(rangeResponse) { + resp = fmt.Sprintf("range_stream_response_total_count:%d, size:%d", rangeResponse.GetCount(), rangeResponse.GetTotalSize()) + } + warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only rangeStream ", resp, err) +} + // callers need make sure time has passed warningApplyDuration func warnOfExpensiveGenericRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) { lg.Warn( diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 9885fc01c031..61e56e6cc4e3 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -48,6 +48,7 @@ const ( type RaftKV interface { Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) + RangeStream(ctx context.Context, r *pb.RangeRequest, rspC chan *pb.RangeResponse, errC chan error) error Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) @@ -131,6 +132,48 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe return resp, err } +func (s *EtcdServer) RangeStream(ctx context.Context, r *pb.RangeRequest, rspC chan *pb.RangeResponse, errC chan error) error { + trace := traceutil.New("rangeStream", + s.lg, + traceutil.Field{Key: "range_begin", Value: string(r.Key)}, + traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)}, + ) + ctx = context.WithValue(ctx, traceutil.TraceKey, trace) + + var resp *pb.RangeResponse + var err error + + defer func(start time.Time) { + warnOfExpensiveReadOnlyRangeStreamRequest(s.lg, start, r, resp, err) + if resp != nil { + trace.AddField( + traceutil.Field{Key: "response_total_count", Value: resp.GetCount()}, + traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}, + ) + } + trace.LogIfLong(traceThreshold) + }(time.Now()) + + if !r.Serializable { + err = s.linearizableReadNotify(ctx) + trace.Step("agreement among raft nodes before linearized reading") + if err != nil { + return err + } + } + + chk := func(ai *auth.AuthInfo) error { + return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) + } + + get := func() { resp, err = s.applyV3Base.RangeStream(ctx, nil, r, rspC, errC) } + if serr := s.doSerialize(ctx, chk, get); serr != nil { + err = serr + return err + } + return err +} + func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now()) resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) diff --git a/server/proxy/grpcproxy/adapter/kv_client_adapter.go b/server/proxy/grpcproxy/adapter/kv_client_adapter.go index ddb6ada47327..b409ef01547e 100644 --- a/server/proxy/grpcproxy/adapter/kv_client_adapter.go +++ b/server/proxy/grpcproxy/adapter/kv_client_adapter.go @@ -32,6 +32,13 @@ func (s *kvs2kvc) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.C return s.kvs.Range(ctx, in) } +func (s *kvs2kvc) RangeStream(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (pb.KV_RangeStreamClient, error) { + cs := newPipeStream(ctx, func(ss chanServerStream) error { + return s.kvs.RangeStream(in, (&rs2rcServerStream{ss})) + }) + return &rs2rcClientStream{cs}, nil +} + func (s *kvs2kvc) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (*pb.PutResponse, error) { return s.kvs.Put(ctx, in) } @@ -47,3 +54,30 @@ func (s *kvs2kvc) Txn(ctx context.Context, in *pb.TxnRequest, opts ...grpc.CallO func (s *kvs2kvc) Compact(ctx context.Context, in *pb.CompactionRequest, opts ...grpc.CallOption) (*pb.CompactionResponse, error) { return s.kvs.Compact(ctx, in) } + + +type rs2rcClientStream struct{ chanClientStream } + +type rs2rcServerStream struct{ chanServerStream } + +func (s *rs2rcClientStream) Send(wr *pb.RangeRequest) error { + return s.SendMsg(wr) +} +func (s *rs2rcClientStream) Recv() (*pb.RangeResponse, error) { + var v interface{} + if err := s.RecvMsg(&v); err != nil { + return nil, err + } + return v.(*pb.RangeResponse), nil +} + +func (s *rs2rcServerStream) Send(wr *pb.RangeResponse) error { + return s.SendMsg(wr) +} +func (s *rs2rcServerStream) Recv() (*pb.RangeRequest, error) { + var v interface{} + if err := s.RecvMsg(&v); err != nil { + return nil, err + } + return v.(*pb.RangeRequest), nil +} diff --git a/server/proxy/grpcproxy/kv.go b/server/proxy/grpcproxy/kv.go index 6e88eb9fb952..10a120dd907a 100644 --- a/server/proxy/grpcproxy/kv.go +++ b/server/proxy/grpcproxy/kv.go @@ -22,6 +22,8 @@ import ( "go.etcd.io/etcd/server/v3/proxy/grpcproxy/cache" ) +const RangeStreamBatch int = 500 + type kvProxy struct { kv clientv3.KV cache cache.Cache @@ -67,6 +69,81 @@ func (p *kvProxy) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRespo return gresp, nil } +func (p *kvProxy) RangeStream(r *pb.RangeRequest, rss pb.KV_RangeStreamServer) error { + + sendFunc := func(resp *pb.RangeResponse) (err error) { + subRSP := &pb.RangeResponse{} + subRSP.Header = &pb.ResponseHeader{} + + if resp.Kvs == nil || len(resp.Kvs) == 0 { + err = rss.Send(subRSP) + if err != nil { + return err + } + return nil + } + + start := int64(0) + end := int64(RangeStreamBatch) + for { + if end > resp.Count { + end = resp.Count + } + subRSP.Kvs = resp.Kvs[start:end] + subRSP.Count = subRSP.Count + subRSP.Header = subRSP.Header + err = rss.Send(subRSP) + if err != nil { + return err + } + if resp.Count <= end { + break + } + + start = end + end += int64(RangeStreamBatch) + } + return nil + } + + if r.Serializable { + resp, err := p.cache.Get(r) + switch err { + case nil: + cacheHits.Inc() + err := sendFunc(resp) + if err != nil { + return err + } + return nil + case cache.ErrCompacted: + cacheHits.Inc() + return err + } + + cachedMisses.Inc() + } + + resp, err := p.kv.Do(rss.Context(), RangeStreamRequestToOp(r)) + if err != nil { + return err + } + + // cache linearizable as serializable + req := *r + req.Serializable = true + gresp := (*pb.RangeResponse)(resp.GetStream()) + err = sendFunc(gresp) + if err != nil { + return err + } + + p.cache.Add(&req, gresp) + cacheKeys.Set(float64(p.cache.Size())) + + return nil +} + func (p *kvProxy) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { p.cache.Invalidate(r.Key, nil) cacheKeys.Set(float64(p.cache.Size())) @@ -189,6 +266,23 @@ func RangeRequestToOp(r *pb.RangeRequest) clientv3.Op { return clientv3.OpGet(string(r.Key), opts...) } +func RangeStreamRequestToOp(r *pb.RangeRequest) clientv3.Op { + opts := []clientv3.OpOption{} + if len(r.RangeEnd) != 0 { + opts = append(opts, clientv3.WithRange(string(r.RangeEnd))) + } + opts = append(opts, clientv3.WithRev(r.Revision)) + opts = append(opts, clientv3.WithLimit(r.Limit)) + if r.KeysOnly { + opts = append(opts, clientv3.WithKeysOnly()) + } + if r.Serializable { + opts = append(opts, clientv3.WithSerializable()) + } + + return clientv3.OpGetStream(string(r.Key), opts...) +} + func PutRequestToOp(r *pb.PutRequest) clientv3.Op { opts := []clientv3.OpOption{} opts = append(opts, clientv3.WithLease(clientv3.LeaseID(r.Lease))) diff --git a/server/storage/mvcc/kv.go b/server/storage/mvcc/kv.go index 10c4821b1463..ff5ec57f369c 100644 --- a/server/storage/mvcc/kv.go +++ b/server/storage/mvcc/kv.go @@ -33,6 +33,7 @@ type RangeResult struct { KVs []mvccpb.KeyValue Rev int64 Count int + Err error } type ReadView interface { @@ -53,6 +54,8 @@ type ReadView interface { // Limit limits the number of keys returned. // If the required rev is compacted, ErrCompacted will be returned. Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) + + RangeStream(key, end []byte, ro RangeOptions, streamC chan *RangeResult) (err error) } // TxnRead represents a read-only transaction with operations that will not diff --git a/server/storage/mvcc/kv_view.go b/server/storage/mvcc/kv_view.go index 56260e7599a2..f35e6b02f3a8 100644 --- a/server/storage/mvcc/kv_view.go +++ b/server/storage/mvcc/kv_view.go @@ -41,6 +41,12 @@ func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) return tr.Range(ctx, key, end, ro) } +func (rv *readView) RangeStream(key, end []byte, ro RangeOptions, streamC chan *RangeResult) (err error) { + tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO()) + defer tr.End() + return tr.RangeStream(key, end, ro, streamC) +} + type writeView struct{ kv KV } func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) { diff --git a/server/storage/mvcc/kvstore_txn.go b/server/storage/mvcc/kvstore_txn.go index 56c2335c2a61..33a65a6071cb 100644 --- a/server/storage/mvcc/kvstore_txn.go +++ b/server/storage/mvcc/kvstore_txn.go @@ -25,6 +25,8 @@ import ( "go.uber.org/zap" ) +const RangeStreamBatch = 500 + type storeTxnRead struct { s *store tx backend.ReadTx @@ -62,6 +64,10 @@ func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOpti return tr.rangeKeys(ctx, key, end, tr.Rev(), ro) } +func (tr *storeTxnRead) RangeStream(key, end []byte, ro RangeOptions, streamC chan *RangeResult) (err error) { + return tr.rangeStreamKeys(key, end, tr.Rev(), ro, streamC) +} + func (tr *storeTxnRead) End() { tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx. tr.s.mu.RUnlock() @@ -98,6 +104,14 @@ func (tw *storeTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOpt return tw.rangeKeys(ctx, key, end, rev, ro) } +func (tw *storeTxnWrite) RangeStream(key, end []byte, ro RangeOptions, streamC chan *RangeResult) (err error) { + rev := tw.beginRev + if len(tw.changes) > 0 { + rev++ + } + return tw.rangeStreamKeys(key, end, rev, ro, streamC) +} + func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) { if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 { return n, tw.beginRev + 1 @@ -179,6 +193,91 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil } +func (tr *storeTxnRead) rangeStreamKeys(key, end []byte, curRev int64, ro RangeOptions, streamC chan *RangeResult) error { + defer func() { + if err := recover(); err != nil { + switch e := err.(type) { + case error: + tr.s.lg.Error( + "storeTxnRead rangeStreamKeys() panic error", zap.Error(e)) + } + } + }() + + defer close(streamC) + + rev := ro.Rev + if rev > curRev { + streamC <- &RangeResult{KVs: nil, Count: -1, Rev: curRev, Err: ErrFutureRev} + return ErrFutureRev + } + if rev <= 0 { + rev = curRev + } + if rev < tr.s.compactMainRev { + streamC <- &RangeResult{KVs: nil, Count: -1, Rev: 0, Err: ErrCompacted} + return ErrCompacted + } + revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit)) + tr.trace.Step("range keys from in-memory index tree") + if len(revpairs) == 0 { + streamC <- &RangeResult{KVs: nil, Count: total, Rev: curRev} + streamC <- nil + return nil + } + + limit := int(ro.Limit) + if limit <= 0 || limit > len(revpairs) { + limit = len(revpairs) + } + + var kvsLen int + totalRevpairsCount := len(revpairs) + + if limit <= RangeStreamBatch { + kvsLen = limit + } else { + kvsLen = RangeStreamBatch + limit -= RangeStreamBatch + } + kvs := make([]mvccpb.KeyValue, kvsLen) + + revBytes := newRevBytes() + for i, revpair := range revpairs { + revToBytes(revpair, revBytes) + _, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0) + if len(vs) != 1 { + tr.s.lg.Fatal( + "range failed to find revision pair", + zap.Int64("revision-main", revpair.main), + zap.Int64("revision-sub", revpair.sub), + ) + } + if err := kvs[i%RangeStreamBatch].Unmarshal(vs[0]); err != nil { + tr.s.lg.Fatal( + "failed to unmarshal mvccpb.KeyValue", + zap.Error(err), + ) + } + if (i+1)%RangeStreamBatch == 0 && (i+1) != totalRevpairsCount { + streamC <- &RangeResult{KVs: kvs, Count: RangeStreamBatch, Rev: curRev} + + if limit <= RangeStreamBatch { + kvsLen = limit + } else { + kvsLen = RangeStreamBatch + limit -= RangeStreamBatch + } + kvs = make([]mvccpb.KeyValue, kvsLen) + } + } + + tr.trace.Step("range keys from bolt db") + streamC <- &RangeResult{KVs: kvs, Count: kvsLen, Rev: curRev} + streamC <- nil + return nil +} + func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { rev := tw.beginRev + 1 c := rev diff --git a/server/storage/mvcc/metrics_txn.go b/server/storage/mvcc/metrics_txn.go index aef877a1c159..2a0b08ae06f3 100644 --- a/server/storage/mvcc/metrics_txn.go +++ b/server/storage/mvcc/metrics_txn.go @@ -26,14 +26,15 @@ type metricsTxnWrite struct { puts uint deletes uint putSize int64 + rangeStreams uint } func newMetricsTxnRead(tr TxnRead) TxnRead { - return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0, 0} + return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0, 0, 0} } func newMetricsTxnWrite(tw TxnWrite) TxnWrite { - return &metricsTxnWrite{tw, 0, 0, 0, 0} + return &metricsTxnWrite{tw, 0, 0, 0, 0, 0} } func (tw *metricsTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (*RangeResult, error) { @@ -41,6 +42,11 @@ func (tw *metricsTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeO return tw.TxnWrite.Range(ctx, key, end, ro) } +func (tw *metricsTxnWrite) RangeStream(key, end []byte, ro RangeOptions, streamC chan *RangeResult) (err error) { + tw.rangeStreams++ + return tw.TxnWrite.RangeStream(key, end, ro, streamC) +} + func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) { tw.deletes++ return tw.TxnWrite.DeleteRange(key, end)