Skip to content

Commit

Permalink
meta/rename: lock both src and dst parent for rename op to avoid txn …
Browse files Browse the repository at this point in the history
…conflicts and performance (#5574)

Signed-off-by: Changxin Miao <[email protected]>
  • Loading branch information
polyrabbit authored Feb 14, 2025
1 parent 7c29128 commit 61fc5d9
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 13 deletions.
30 changes: 30 additions & 0 deletions pkg/meta/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,36 @@ func (r *baseMeta) txUnlock(idx uint) {
r.txlocks[idx%nlocks].Unlock()
}

func (r *baseMeta) txBatchLock(inodes ...Ino) func() {
switch len(inodes) {
case 0:
return func() {}
case 1: // most cases
r.txLock(uint(inodes[0]))
return func() { r.txUnlock(uint(inodes[0])) }
default: // for rename and more
inodeSlots := make([]int, len(inodes))
for i, ino := range inodes {
inodeSlots[i] = int(ino % nlocks)
}
sort.Ints(inodeSlots)
uniqInodeSlots := inodeSlots[:0]
for i := 0; i < len(inodeSlots); i++ { // Go does not support recursive locks
if i == 0 || inodeSlots[i] != inodeSlots[i-1] {
uniqInodeSlots = append(uniqInodeSlots, inodeSlots[i])
}
}
for _, idx := range uniqInodeSlots {
r.txlocks[idx].Lock()
}
return func() {
for _, idx := range uniqInodeSlots {
r.txlocks[idx].Unlock()
}
}
}
}

func (r *baseMeta) OnMsg(mtype uint32, cb MsgCallback) {
r.msgCallbacks.Lock()
defer r.msgCallbacks.Unlock()
Expand Down
60 changes: 60 additions & 0 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"os"
"reflect"
"runtime"
Expand Down Expand Up @@ -3282,3 +3283,62 @@ func TestSymlinkCache(t *testing.T) {
cache.doClean()
require.Equal(t, int32(8000), cache.size.Load())
}

func TestTxBatchLock(t *testing.T) {
var base baseMeta
// 0 inode
func() {
defer base.txBatchLock()()
}()
// 1 inodes
func() {
defer base.txBatchLock(2)()
}()
// 2 inodes
func() {
defer base.txBatchLock(1, 2)()
}()
// no reentrant
func() {
defer base.txBatchLock(1, 1, nlocks+1)()
}()
// no deadlock - sequential
func() {
batch1 := []Ino{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
batch2 := []Ino{1 + nlocks*9, 2 + nlocks*8, 3 + nlocks*7, 4 + nlocks*6, 5 + nlocks*5, 6 + nlocks*4, 7 + nlocks*3, 8 + nlocks*2, 9 + nlocks, 10}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(2)
go func() {
defer wg.Done()
defer base.txBatchLock(batch1...)()
}()
go func() {
defer wg.Done()
defer base.txBatchLock(batch2...)()
}()
}
wg.Wait()
}()
// no deadlock - fuzz testing
func() {
var batch1, batch2 []Ino
for i := 0; i < 100; i++ {
batch1 = append(batch1, Ino(rand.Uint64()+1))
batch2 = append(batch2, Ino(rand.Uint64()+1))
}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(2)
go func() {
defer wg.Done()
defer base.txBatchLock(batch1...)()
}()
go func() {
defer wg.Done()
defer base.txBatchLock(batch2...)()
}()
}
wg.Wait()
}()
}
11 changes: 6 additions & 5 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,10 +918,7 @@ func (m *dbMeta) txn(f func(s *xorm.Session) error, inodes ...Ino) error {
inodes = []Ino{1}
}

if len(inodes) > 0 {
m.txLock(uint(inodes[0]))
defer m.txUnlock(uint(inodes[0]))
}
defer m.txBatchLock(inodes...)()
var lastErr error
for i := 0; i < 50; i++ {
_, err := m.db.Transaction(func(s *xorm.Session) (interface{}, error) {
Expand Down Expand Up @@ -1940,6 +1937,10 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
var dino Ino
var dn node
var newSpace, newInode int64
parentLocks := []Ino{parentDst}
if !isTrash(parentSrc) { // there should be no conflict if parentSrc is in trash, relax lock to accelerate `restore` subcommand
parentLocks = append(parentLocks, parentSrc)
}
err := m.txn(func(s *xorm.Session) error {
opened = false
dino = 0
Expand Down Expand Up @@ -2262,7 +2263,7 @@ func (m *dbMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
}
}
return err
})
}, parentLocks...)
if err == nil && !exchange && trash == 0 {
if dino > 0 && dn.Type == TypeFile && dn.Nlink == 0 {
m.fileDeleted(opened, false, dino, dn.Length)
Expand Down
13 changes: 5 additions & 8 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,10 +809,7 @@ func (m *kvMeta) txn(ctx context.Context, f func(tx *kvTxn) error, inodes ...Ino
}
start := time.Now()
defer func() { m.txDist.Observe(time.Since(start).Seconds()) }()
if len(inodes) > 0 {
m.txLock(uint(inodes[0]))
defer m.txUnlock(uint(inodes[0]))
}
defer m.txBatchLock(inodes...)()
var lastErr error
for i := 0; i < 50; i++ {
err := m.client.txn(ctx, f, i)
Expand Down Expand Up @@ -1491,9 +1488,9 @@ func (m *kvMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
var dtyp uint8
var tattr Attr
var newSpace, newInode int64
lockParent := parentSrc
if isTrash(lockParent) {
lockParent = parentDst
parentLocks := []Ino{parentDst}
if !isTrash(parentSrc) { // there should be no conflict if parentSrc is in trash, relax lock to accelerate `restore` subcommand
parentLocks = append(parentLocks, parentSrc)
}
err := m.txn(ctx, func(tx *kvTxn) error {
opened = false
Expand Down Expand Up @@ -1731,7 +1728,7 @@ func (m *kvMeta) doRename(ctx Context, parentSrc Ino, nameSrc string, parentDst
tx.set(m.inodeKey(parentDst), m.marshal(&dattr))
}
return nil
}, lockParent)
}, parentLocks...)
if err == nil && !exchange && trash == 0 {
if dino > 0 && dtyp == TypeFile && tattr.Nlink == 0 {
m.fileDeleted(opened, false, dino, tattr.Length)
Expand Down

0 comments on commit 61fc5d9

Please sign in to comment.