Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/impl splistore #6231

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7d5f66c
feat: rm useless store
LinZexiao Nov 9, 2023
a384c93
feat: modify fs repo for splitstore
LinZexiao Nov 9, 2023
e16e356
feat: add more log about MinerGetBaseInfo
LinZexiao Nov 20, 2023
611e023
feat: add SetLogLevel to set log level
LinZexiao Nov 20, 2023
c7c83c8
feat: add ForEachKey to BadgerBlockstore
LinZexiao Nov 20, 2023
ec298c7
feat: add splitstore
LinZexiao Nov 20, 2023
9c549bf
feat: add unit test
LinZexiao Nov 20, 2023
aaed1eb
feat: add split store
LinZexiao Nov 20, 2023
c884ebe
fix: inject initstore
LinZexiao Nov 20, 2023
776e515
fix: call putmany on primary
LinZexiao Nov 20, 2023
601f1c7
feat: add log
LinZexiao Nov 20, 2023
bf83de3
feat: add config for splitstore:
LinZexiao Nov 20, 2023
6c76c1d
feat: add lock for close
LinZexiao Nov 21, 2023
7c31d37
fix: try best to walk more object
LinZexiao Nov 21, 2023
2904ddb
feat: add cmd to rollback splitstore
LinZexiao Nov 22, 2023
66a0408
feat: limit store size to half of chan finality
LinZexiao Nov 22, 2023
1c06ee4
feat: a smaller db for test
LinZexiao Nov 22, 2023
b39a27b
fix: enable start with splitstore
LinZexiao Nov 22, 2023
9b339e4
feat: store size must bigger than chain finality
LinZexiao Nov 22, 2023
40bdbf8
fix: typo
LinZexiao Nov 23, 2023
5c3656c
fix: make lint happy
LinZexiao Nov 23, 2023
6e956a7
fix: var name overlap
LinZexiao Nov 23, 2023
dc9d2be
feat: walk object concurrently
LinZexiao Nov 28, 2023
2e41814
feat: walk state for 4 finality
LinZexiao Dec 25, 2023
72f895f
feat: protect head cid
LinZexiao Dec 29, 2023
4f67ef5
chore: rm temp test
LinZexiao Dec 29, 2023
103feac
chore: ignore some unused
LinZexiao Dec 29, 2023
3f7daef
fix: return error when rollback disable
LinZexiao Dec 29, 2023
969292b
fix: git ignore lock of db
LinZexiao Dec 29, 2023
86bf950
fix : unit test for splitstore
LinZexiao Dec 29, 2023
f6c30c5
feat: enable soft delete in splitstore
LinZexiao Dec 29, 2023
7979e67
chore: rm TestWalkOneState
LinZexiao Dec 29, 2023
0bc46e3
feat: make lint happy
LinZexiao Dec 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: add unit test
LinZexiao committed Dec 20, 2023
commit 9c549bfe5b0662acf8fa0f3624206f26e4b6fafb
382 changes: 382 additions & 0 deletions venus-shared/blockstore/splitstore/compose_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,382 @@
package splitstore

import (
"context"
"path/filepath"
"testing"
"time"

"github.com/filecoin-project/venus/venus-shared/blockstore"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/stretchr/testify/require"
)

func TestComposeStoreGet(t *testing.T) {
ctx := context.Background()
composeStore, primaryStore, secondaryStore, tertiaryStore := getBlockstore(t)
blocksInPrimary := []blocks.Block{
newBlock("b1"),
newBlock("b2"),
newBlock("b3"),
}
blocksInSecondary := []blocks.Block{
newBlock("b3"),
newBlock("b4"),
}
blocksInTertiary := []blocks.Block{
newBlock("b4"),
newBlock("b5"),
}
blockNotExist := newBlock("b6")

for _, b := range blocksInPrimary {
require.NoError(t, primaryStore.Put(ctx, b))
}
for _, b := range blocksInSecondary {
require.NoError(t, secondaryStore.Put(ctx, b))
}
for _, b := range blocksInTertiary {
require.NoError(t, tertiaryStore.Put(ctx, b))
}

t.Run("Get", func(t *testing.T) {
for _, b := range blocksInPrimary {
b, err := composeStore.Get(ctx, b.Cid())
require.NoError(t, err)
require.Equal(t, b.RawData(), b.RawData())
}
for _, b := range blocksInSecondary {
b, err := composeStore.Get(ctx, b.Cid())
require.NoError(t, err)
require.Equal(t, b.RawData(), b.RawData())
}
for _, b := range blocksInTertiary {
b, err := composeStore.Get(ctx, b.Cid())
require.NoError(t, err)
require.Equal(t, b.RawData(), b.RawData())
}

_, err := composeStore.Get(ctx, blockNotExist.Cid())
require.True(t, ipld.IsNotFound(err))

// test for sync
// wait for sync (switch goroutine)
time.Sleep(5 * time.Millisecond)
for _, b := range blocksInTertiary {
b, err := primaryStore.Get(ctx, b.Cid())
require.NoError(t, err)
require.Equal(t, b.RawData(), b.RawData())
}
})
}

func TestComposeStoreGetSize(t *testing.T) {
ctx := context.Background()
composeStore, primaryStore, secondaryStore, _ := getBlockstore(t)
blocksInPrimary := []blocks.Block{
newBlock("b1"),
newBlock("b2"),
newBlock("b3"),
}
blocksInSecondary := []blocks.Block{
newBlock("b3"),
newBlock("b4"),
}
blockNotExist := newBlock("b5")

for _, b := range blocksInPrimary {
require.NoError(t, primaryStore.Put(ctx, b))
}
for _, b := range blocksInSecondary {
require.NoError(t, secondaryStore.Put(ctx, b))
}

t.Run("GetSize", func(t *testing.T) {
for _, b := range blocksInPrimary {
sz, err := composeStore.GetSize(ctx, b.Cid())
require.NoError(t, err)
require.Equal(t, len(b.RawData()), sz)
}
for _, b := range blocksInSecondary {
sz, err := composeStore.GetSize(ctx, b.Cid())
require.NoError(t, err)
require.Equal(t, len(b.RawData()), sz)
}

_, err := composeStore.GetSize(ctx, blockNotExist.Cid())
require.True(t, ipld.IsNotFound(err))

// test for sync
// wait for sync (switch goroutine)
time.Sleep(1 * time.Millisecond)
for _, b := range blocksInSecondary {
sz, err := primaryStore.GetSize(ctx, b.Cid())
require.NoError(t, err)
require.Equal(t, len(b.RawData()), sz)
}
})
}

func TestComposeStoreView(t *testing.T) {
ctx := context.Background()
composeStore, primaryStore, secondaryStore, _ := getBlockstore(t)
blocksInPrimary := []blocks.Block{
newBlock("b1"),
newBlock("b2"),
newBlock("b3"),
}
blocksInSecondary := []blocks.Block{
newBlock("b3"),
newBlock("b4"),
}
blockNotExist := newBlock("b5")

for _, b := range blocksInPrimary {
require.NoError(t, primaryStore.Put(ctx, b))
}
for _, b := range blocksInSecondary {
require.NoError(t, secondaryStore.Put(ctx, b))
}

t.Run("View", func(t *testing.T) {
for _, b := range blocksInPrimary {
composeStore.View(ctx, b.Cid(), func(b []byte) error {
require.Equal(t, b, b)
return nil
})
}
for _, b := range blocksInSecondary {
composeStore.View(ctx, b.Cid(), func(b []byte) error {
require.Equal(t, b, b)
return nil
})
}

err := composeStore.View(ctx, blockNotExist.Cid(), func(b []byte) error {
require.Nil(t, b)
return nil
})
require.True(t, ipld.IsNotFound(err))

// test for sync
// wait for sync (switch goroutine)
for _, b := range blocksInSecondary {
primaryStore.View(ctx, b.Cid(), func(b []byte) error {
require.Equal(t, b, b)
return nil
})
}
})
}

func TestComposeStoreHas(t *testing.T) {
ctx := context.Background()
composeStore, primaryStore, secondaryStore, _ := getBlockstore(t)
blocksInPrimary := []blocks.Block{
newBlock("b1"),
newBlock("b2"),
newBlock("b3"),
}
blocksInSecondary := []blocks.Block{
newBlock("b3"),
newBlock("b4"),
}
blockNotExist := newBlock("b5")

for _, b := range blocksInPrimary {
require.NoError(t, primaryStore.Put(ctx, b))
}
for _, b := range blocksInSecondary {
require.NoError(t, secondaryStore.Put(ctx, b))
}

t.Run("Has", func(t *testing.T) {
for _, b := range blocksInPrimary {
h, err := composeStore.Has(ctx, b.Cid())
require.NoError(t, err)
require.True(t, h)
}
for _, b := range blocksInSecondary {
h, err := composeStore.Has(ctx, b.Cid())
require.NoError(t, err)
require.True(t, h)
}

h, err := composeStore.Has(ctx, blockNotExist.Cid())
require.NoError(t, err)
require.False(t, h)

// test for sync
// wait for sync (switch goroutine)
time.Sleep(1 * time.Millisecond)
for _, b := range blocksInSecondary {
h, err := primaryStore.Has(ctx, b.Cid())
require.NoError(t, err)
require.True(t, h)
}
})
}

func TestComposeStoreAllKeysChan(t *testing.T) {
ctx := context.Background()
composeStore, primaryStore, secondaryStore, tertiaryStore := getBlockstore(t)
blocksInPrimary := []blocks.Block{
newBlock("b1"),
newBlock("b2"),
newBlock("b3"),
}
blocksInSecondary := []blocks.Block{
newBlock("b3"),
newBlock("b4"),
}
blocksInTertiary := []blocks.Block{
newBlock("b5"),
newBlock("b6"),
}

for _, b := range blocksInPrimary {
require.NoError(t, primaryStore.Put(ctx, b))
}
for _, b := range blocksInSecondary {
require.NoError(t, secondaryStore.Put(ctx, b))
}
for _, b := range blocksInTertiary {
require.NoError(t, tertiaryStore.Put(ctx, b))
}

t.Run("All keys chan", func(t *testing.T) {
ch, err := composeStore.AllKeysChan(ctx)
require.NoError(t, err)
require.NotNil(t, ch)

cidGet := cid.NewSet()
for cid := range ch {
require.True(t, cidGet.Visit(cid))
}
for _, b := range blocksInPrimary {
require.False(t, cidGet.Has(b.Cid()))
}
for _, b := range blocksInSecondary {
require.False(t, cidGet.Has(b.Cid()))
}

require.Equal(t, 6, cidGet.Len())
})
}

func TestComposeStorePut(t *testing.T) {
ctx := context.Background()
composeStore, primaryStore, secondaryStore, _ := getBlockstore(t)
blockNotExist := newBlock("b5")

t.Run("Put", func(t *testing.T) {
require.NoError(t, composeStore.Put(ctx, blockNotExist))
h, err := composeStore.Has(ctx, blockNotExist.Cid())
require.NoError(t, err)
require.True(t, h)

h, err = primaryStore.Has(ctx, blockNotExist.Cid())
require.NoError(t, err)
require.True(t, h)

h, err = secondaryStore.Has(ctx, blockNotExist.Cid())
require.NoError(t, err)
require.False(t, h)
})
}

func TestComposeStoreDelete(t *testing.T) {
ctx := context.Background()
composeStore, primaryStore, secondaryStore, tertiaryStore := getBlockstore(t)
blocksInPrimary := []blocks.Block{
newBlock("b1"),
newBlock("b2"),
newBlock("b3"),
}
blocksInSecondary := []blocks.Block{
newBlock("b3"),
newBlock("b4"),
}
blocksInTertiary := []blocks.Block{
newBlock("b3"),
newBlock("b6"),
}
blockNotExist := newBlock("b5")

for _, b := range blocksInPrimary {
require.NoError(t, primaryStore.Put(ctx, b))
}
for _, b := range blocksInSecondary {
require.NoError(t, secondaryStore.Put(ctx, b))
}
for _, b := range blocksInTertiary {
require.NoError(t, tertiaryStore.Put(ctx, b))
}

t.Run("Delete", func(t *testing.T) {
for _, b := range blocksInPrimary {
err := composeStore.DeleteBlock(ctx, b.Cid())
require.NoError(t, err)

h, err := composeStore.Has(ctx, b.Cid())
require.NoError(t, err)
require.False(t, h)
}
for _, b := range blocksInSecondary {
err := composeStore.DeleteBlock(ctx, b.Cid())
require.NoError(t, err)

h, err := composeStore.Has(ctx, b.Cid())
require.NoError(t, err)
require.False(t, h)
}

err := composeStore.DeleteBlock(ctx, blockNotExist.Cid())
require.NoError(t, err)
})
}

func TestNewComposeStore(t *testing.T) {
tempDir := t.TempDir()

primaryPath := filepath.Join(tempDir, "primary")
optPri, err := blockstore.BadgerBlockstoreOptions(primaryPath, false)
require.NoError(t, err)
dsPri, err := blockstore.Open(optPri)
require.NoError(t, err)

cs := NewComposeStore(dsPri)
_, err = cs.Has(context.Background(), cid.Undef)
require.NoError(t, err)
}

func getBlockstore(t *testing.T) (compose, primary, secondary, tertiary blockstore.Blockstore) {
tempDir := t.TempDir()

primaryPath := filepath.Join(tempDir, "primary")
secondaryPath := filepath.Join(tempDir, "secondary")
tertiaryPath := filepath.Join(tempDir, "tertiary")

optPri, err := blockstore.BadgerBlockstoreOptions(primaryPath, false)
require.NoError(t, err)
dsPri, err := blockstore.Open(optPri)
require.NoError(t, err)

optSnd, err := blockstore.BadgerBlockstoreOptions(secondaryPath, false)
require.NoError(t, err)
dsSnd, err := blockstore.Open(optSnd)
require.NoError(t, err)

optTertiary, err := blockstore.BadgerBlockstoreOptions(tertiaryPath, false)
require.NoError(t, err)
dsTertiary, err := blockstore.Open(optTertiary)
require.NoError(t, err)

return NewComposeStore(dsTertiary, dsSnd, dsPri), dsPri, dsSnd, dsTertiary
}

func newBlock(s string) blocks.Block {
return blocks.NewBlock([]byte(s))
}
154 changes: 154 additions & 0 deletions venus-shared/blockstore/splitstore/splitstore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package splitstore

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/venus/venus-shared/types"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
)

func TestSplitstore(t *testing.T) {
ctx := context.Background()
tempDir := t.TempDir()
// tempDir := "/root/tanlang/venus/.vscode/test"
splitstorePath := filepath.Join(tempDir, "splitstore")

initStore, err := openStore("./test_data/test_20615_bafy2bzacea53rxdtdrsaovap3bsfpash2sx2cu5ho2unoxg24kl2z3opcjjda")
require.NoError(t, err)

ss, err := NewSplitstore(splitstorePath, initStore)
require.NoError(t, err)

blockCid := cid.MustParse("bafy2bzacea53rxdtdrsaovap3bsfpash2sx2cu5ho2unoxg24kl2z3opcjjda")
tskCid, err := types.NewTipSetKey(blockCid).Cid()

// apply head change to append new store
b, err := ss.getBlock(ctx, blockCid)
require.NoError(t, err)
ts, err := types.NewTipSet([]*types.BlockHeader{b})
require.NoError(t, err)
ss.HeadChange(nil, []*types.TipSet{ts})
require.Len(t, ss.stores, 2)

seenBefore := NewSyncVisitor()
t.Run("initial walk chain", func(t *testing.T) {
err = WalkChain(ctx, ss, tskCid, seenBefore, 0)
require.NoError(t, err)
})

seenAfter := NewSyncVisitor()
t.Run("walk chain after initialize", func(t *testing.T) {
err = WalkChain(ctx, ss.stores[1], tskCid, seenAfter, 0)
require.NoError(t, err)
})

require.Equal(t, seenBefore.Len(), seenAfter.Len())
}

func TestNewSplitstore(t *testing.T) {
tempDir := t.TempDir()
tempBlocks := []blocks.Block{
newBlock("b1"),
newBlock("b2"),
newBlock("b3"),
newBlock("b4"),
newBlock("b4"),
}

for i, b := range tempBlocks {
storePath := fmt.Sprintf("base_%d_%s.db", 10+i, b.Cid())
storePath = filepath.Join(tempDir, storePath)
err := os.MkdirAll(storePath, 0777)
require.NoError(t, err)
}

ss, err := NewSplitstore(tempDir, nil)
require.NoError(t, err)
require.Len(t, ss.stores, ss.maxStoreCount)
}

func TestScan(t *testing.T) {
tempDir := t.TempDir()
tempBlocks := []blocks.Block{
newBlock("b1"),
newBlock("b2"),
newBlock("b3"),
}

for i, b := range tempBlocks {
storePath := fmt.Sprintf("base_%d_%s.db", 10+i, b.Cid())
storePath = filepath.Join(tempDir, storePath)
err := os.MkdirAll(storePath, 0777)
require.NoError(t, err)
}

// base_0_init.db(place holder)
err := os.MkdirAll(filepath.Join(tempDir, "base_0_init.db"), 0777)
require.NoError(t, err)

// any.db will not be scanned in
err = os.MkdirAll(filepath.Join(tempDir, "any.db"), 0777)
require.NoError(t, err)

bs, err := scan(tempDir)
require.NoError(t, err)

t.Run("scan in", func(t *testing.T) {
require.Len(t, bs, len(tempBlocks)+1)

for i, b := range tempBlocks {
require.Equal(t, b.Cid(), bs[i+1].Base())
}

// store from place holder should be empty
require.Nil(t, bs[0].Blockstore)
})

t.Run("slean up", func(t *testing.T) {
for i := range bs {
store := bs[i]
err := store.Clean()
require.NoError(t, err)
}

bs, err = scan(tempDir)
require.NoError(t, err)
require.Len(t, bs, 0)
})
}

func TestExtractHeightAndCid(t *testing.T) {
h, c, err := extractHeightAndCid("base_10_b1.db")
require.NoError(t, err)
require.Equal(t, int64(10), h)
require.Equal(t, "b1", c)

h, c, err = extractHeightAndCid("base_10_b1")
require.Error(t, err)

h, c, err = extractHeightAndCid("base_b1")
require.Error(t, err)
}

func fakeTipset(height abi.ChainEpoch) *types.TipSet {
c, _ := abi.CidBuilder.Sum([]byte("any"))

bh := &types.BlockHeader{
Miner: address.TestAddress,
Messages: c,
ParentStateRoot: c,
ParentMessageReceipts: c,
Height: height,
}

ts, _ := types.NewTipSet([]*types.BlockHeader{bh})
return ts
}
93 changes: 93 additions & 0 deletions venus-shared/blockstore/splitstore/walk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package splitstore

import (
"context"
"fmt"
"math/rand"
"testing"

"github.com/dgraph-io/badger/v2"
"github.com/filecoin-project/venus/venus-shared/blockstore"
"github.com/filecoin-project/venus/venus-shared/logging"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/stretchr/testify/require"
)

func init() {
err := logging.SetLogLevel("splitstore", "debug")
if err != nil {
panic(err)
}
}

func TestWalk(t *testing.T) {
ctx := context.Background()

log.Info("log level")
log.Debug("log level")

badgerPath := "./test_data/test_20615_bafy2bzacea53rxdtdrsaovap3bsfpash2sx2cu5ho2unoxg24kl2z3opcjjda"
blockCid := cid.MustParse("bafy2bzacea53rxdtdrsaovap3bsfpash2sx2cu5ho2unoxg24kl2z3opcjjda")

ds, err := openStore(badgerPath)
require.NoError(t, err)

cst := cbor.NewCborStore(ds)

var b types.BlockHeader
err = cst.Get(ctx, blockCid, &b)
require.NoError(t, err)

tsk := types.NewTipSetKey(blockCid)
require.False(t, tsk.IsEmpty())

tskCid, err := tsk.Cid()
require.NoError(t, err)

seen := NewSyncVisitor()

err = WalkChain(ctx, ds, tskCid, seen, 14)
require.NoError(t, err)
}

func openStore(path string) (*blockstore.BadgerBlockstore, error) {
opt, err := blockstore.BadgerBlockstoreOptions(path, false)
opt.Prefix = "blocks"
if err != nil {
return nil, err
}
return blockstore.Open(opt)
}

func TestGetAllKeys(t *testing.T) {
// 打开数据库
badgerPath := "/root/tanlang/venus/.vscode/venus_16391_bafy2bzacebdffckqhdnvm767hon3osuv77iryvcjfcvtob6sm2vz6je346k5o"
db, err := badger.Open(badger.DefaultOptions(badgerPath))
if err != nil {
log.Fatal(err)
}
defer db.Close()

// 创建一个事务
txn := db.NewTransaction(false)
defer txn.Discard()

// 迭代遍历所有的 keys
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10
it := txn.NewIterator(opts)
defer it.Close()

for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.KeyCopy(nil)
fmt.Println(string(key))
}
}

func TestRand(t *testing.T) {
rand.New(rand.NewSource(0))
rand.Seed(0)
}