Skip to content

Commit

Permalink
lessor: fix le.itemMap leak after lease is revoked
Browse files Browse the repository at this point in the history
Signed-off-by: qsyqian <[email protected]>
Co-authored-by: Thomas Jungblut <[email protected]>
  • Loading branch information
qsyqian and tjungblu committed Jun 16, 2023
1 parent a708bed commit 669629a
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 0 deletions.
18 changes: 18 additions & 0 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ type Lessor interface {
// Leases lists all leases.
Leases() []*Lease

// Items lists all item
Items() []string

// ExpiredLeasesC returns a chan that is used to receive expired leases.
ExpiredLeasesC() <-chan []*Lease

Expand Down Expand Up @@ -464,6 +467,16 @@ func (le *lessor) Leases() []*Lease {
return ls
}

func (le *lessor) Items() []string {
le.mu.RLock()
keys := make([]string, 0, len(le.itemMap))
for k := range le.itemMap {
keys = append(keys, k.Key)
}
le.mu.RUnlock()
return keys
}

func (le *lessor) Promote(extend time.Duration) {
le.mu.Lock()
defer le.mu.Unlock()
Expand Down Expand Up @@ -572,6 +585,9 @@ func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {

l := le.leaseMap[id]
if l == nil {
for _, it := range items {
delete(le.itemMap, it)
}
return ErrLeaseNotFound
}

Expand Down Expand Up @@ -848,6 +864,8 @@ func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil }

func (fl *FakeLessor) Leases() []*Lease { return nil }

func (fl *FakeLessor) Items() []string { return nil }

func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }

func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {}
Expand Down
37 changes: 37 additions & 0 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

Expand Down Expand Up @@ -208,6 +209,42 @@ func TestLessorRevoke(t *testing.T) {
}
}

func TestLessorGrantAttachRevokeDetach(t *testing.T) {
lg := zap.NewNop()
_, be := NewTestBackend(t)
defer be.Close()

le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })

// grant a lease with long term (100 seconds) to
// avoid early termination during the test.
l, err := le.Grant(1, 100)
if err != nil {
t.Fatalf("could not grant lease for 100s ttl (%v)", err)
}

items := []LeaseItem{
{"foo"},
{"bar"},
}

if err := le.Attach(l.ID, items); err != nil {
t.Fatalf("failed to attach items to the lease: %v", err)
}

if err := le.Revoke(l.ID); err != nil {
t.Fatalf("failed to revoke lease: %v", err)
}

// Detach items because fakeDeleter.DeleteRange will not call le.Detach.
err = le.Detach(l.ID, items)
assert.Equal(t, ErrLeaseNotFound, err)
assert.Equal(t, 0, len(le.leaseMap))
assert.Equal(t, 0, len(le.itemMap))
}

func renew(t *testing.T, le *lessor, id LeaseID) int64 {
ch := make(chan int64, 1)
errch := make(chan error, 1)
Expand Down
180 changes: 180 additions & 0 deletions server/storage/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,186 @@ func TestStorePut(t *testing.T) {
}
}

func TestStorePutDeleteWithLease(t *testing.T) {
lg := zaptest.NewLogger(t)
key := newTestKeyBytes(lg, revision{2, 0}, false)
kv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Lease: 1,
}
kvb, err := kv.Marshal()
if err != nil {
t.Fatal(err)
}

tests := []struct {
// for put
r indexGetResp
rr *rangeResp

// for delete
dr indexRangeResp
drr *rangeResp
}{
{
indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
nil,

indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
&rangeResp{[][]byte{key}, [][]byte{kvb}},
},
}
for i, tt := range tests {
s := newFakeStore(lg)
s.le = lease.NewLessor(lg, s.b, nil, lease.LessorConfig{MinLeaseTTL: 5})
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)

fi.indexGetRespc <- tt.r
if tt.rr != nil {
b.tx.rangeRespc <- *tt.rr
}

l, err := s.le.Grant(1, 10)
if err != nil {
t.Errorf("failed to grant lease with ttl(10s), err: %v", err)
}

s.Put([]byte("foo"), []byte("bar"), l.ID)

leaseAfterPut := s.le.Lookup(l.ID)
wantItemSet := []string{"foo"}
itemSet := leaseAfterPut.Keys()
if !reflect.DeepEqual(itemSet, wantItemSet) {
t.Errorf("#%d: leaseAfterPut.Keys() = %+v, want %+v", i, itemSet, wantItemSet)
}

itemMap := s.le.Items()
wantItemMap := []string{"foo"}
if !reflect.DeepEqual(itemMap, wantItemMap) {
t.Errorf("#%d: lessor.Items() = %+v, want %+v", i, itemMap, wantItemMap)
}

fi.indexRangeRespc <- tt.dr
if tt.rr != nil {
b.tx.rangeRespc <- *tt.drr
}

s.DeleteRange([]byte("foo"), nil)

leaseAfterDelete := s.le.Lookup(l.ID)
wantItemSet = []string{}
itemSet = leaseAfterDelete.Keys()

if !reflect.DeepEqual(itemSet, wantItemSet) {
t.Errorf("#%d: leaseAfterDelete.Keys() = %+v, want %+v", i, itemSet, wantItemSet)
}

itemMap = s.le.Items()
wantItemMap = []string{}
if !reflect.DeepEqual(itemMap, wantItemMap) {
t.Errorf("#%d: lessor.Items() = %+v, want %+v", i, itemMap, wantItemMap)
}

s.Close()
}
}

func TestStorePutWithLeaseRevoke(t *testing.T) {
lg := zaptest.NewLogger(t)
key := newTestKeyBytes(lg, revision{2, 0}, false)
kv := mvccpb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Lease: 1,
}
kvb, err := kv.Marshal()
if err != nil {
t.Fatal(err)
}

tests := []struct {
// for put
r indexGetResp
rr *rangeResp

// for delete
dr indexRangeResp
drr *rangeResp
}{
{
indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
nil,

indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
&rangeResp{[][]byte{key}, [][]byte{kvb}},
},
}

for i, tt := range tests {
s := newFakeStore(lg)
s.le = lease.NewLessor(lg, s.b, nil, lease.LessorConfig{MinLeaseTTL: 5})
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)

fi.indexGetRespc <- tt.r
if tt.rr != nil {
b.tx.rangeRespc <- *tt.rr
}

l, err := s.le.Grant(1, 10)
if err != nil {
t.Errorf("failed to grant lease with ttl(10s), err: %v", err)
}

s.Put([]byte("foo"), []byte("bar"), l.ID)

leaseAfterPut := s.le.Lookup(l.ID)
wantItemSet := []string{"foo"}
itemSet := leaseAfterPut.Keys()
if !reflect.DeepEqual(itemSet, wantItemSet) {
t.Errorf("#%d: leaseAfterPut.Keys() = %+v, want %+v", i, itemSet, wantItemSet)
}

itemMap := s.le.Items()
wantItemMap := []string{"foo"}
if !reflect.DeepEqual(itemMap, wantItemMap) {
t.Errorf("#%d: lessor.Items() = %+v, want %+v", i, itemMap, wantItemMap)
}

fi.indexRangeRespc <- tt.dr
if tt.rr != nil {
b.tx.rangeRespc <- *tt.drr
}

if err := s.le.Revoke(l.ID); err != nil {
t.Errorf("failed to revoke lease ID: %d, err: %v", l.ID, err)
}

leaseAfterRevoke := s.le.Lookup(l.ID)
if leaseAfterRevoke != nil {
t.Errorf("#%d: leaseAfterDelete = %+v, want nil", i, itemSet)
}

itemMap = s.le.Items()
wantItemMap = []string{}
if !reflect.DeepEqual(itemMap, wantItemMap) {
t.Errorf("#%d: lessor.Items() = %+v, want %+v", i, itemMap, wantItemMap)
}

s.Close()
}

}

func TestStoreRange(t *testing.T) {
lg := zaptest.NewLogger(t)
key := newTestKeyBytes(lg, revision{2, 0}, false)
Expand Down

0 comments on commit 669629a

Please sign in to comment.