Skip to content

Commit 995046a

Browse files
author
yihuang
authored
Problem: version mismatch happen occasionally (#1759)
* Problem: version mismatch happen occationally Solution: - make sure don't load iavl version ahead of versiondb * changelog * Update app/app.go Signed-off-by: yihuang <[email protected]> * memiavl support write old version idopodently * add unit test * fix upgrade store loader * reproduce in integration test * fix memiavl * better test * fix lint --------- Signed-off-by: yihuang <[email protected]>
1 parent 07e0e45 commit 995046a

File tree

8 files changed

+191
-27
lines changed

8 files changed

+191
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* [#1748](https://github.com/crypto-org-chain/cronos/pull/1748) Query with GetCFWithTS to compare both timestamp and key to avoid run fixdata multiple times.
1111
* (versiondb) [#1751](https://github.com/crypto-org-chain/cronos/pull/1751) Add missing Destroy for read options to properly hold and release options reference.
1212
* (versiondb) [#1758](https://github.com/crypto-org-chain/cronos/pull/1758) Avoid ReadOptions mutated by reference release in iterator.
13+
* [#1759](https://github.com/crypto-org-chain/cronos/pull/1759) Fix version mismatch happen occasionally.
1314

1415
### Improvements
1516

app/app.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -959,9 +959,27 @@ func New(
959959
panic(err)
960960
}
961961

962+
// wire up the versiondb's `StreamingService` and `MultiStore`.
963+
if cast.ToBool(appOpts.Get("versiondb.enable")) {
964+
var err error
965+
app.qms, err = app.setupVersionDB(homePath, keys, tkeys, memKeys, okeys)
966+
if err != nil {
967+
panic(err)
968+
}
969+
}
970+
971+
var qmsVersion int64
972+
if app.qms != nil {
973+
qmsVersion = app.qms.LatestVersion()
974+
}
975+
962976
// RegisterUpgradeHandlers is used for registering any on-chain upgrades.
963977
// Make sure it's called after `app.mm` and `app.configurator` are set.
964-
app.RegisterUpgradeHandlers(app.appCodec)
978+
storeLoaderOverritten := app.RegisterUpgradeHandlers(app.appCodec, qmsVersion)
979+
if !storeLoaderOverritten {
980+
// Register the default store loader
981+
app.SetStoreLoader(MaxVersionStoreLoader(qmsVersion))
982+
}
965983

966984
// add test gRPC service for testing gRPC queries in isolation
967985
// testdata.RegisterQueryServer(app.GRPCQueryRouter(), testdata.QueryImpl{})
@@ -992,15 +1010,6 @@ func New(
9921010
app.MountMemoryStores(memKeys)
9931011
app.MountObjectStores(okeys)
9941012

995-
// wire up the versiondb's `StreamingService` and `MultiStore`.
996-
if cast.ToBool(appOpts.Get("versiondb.enable")) {
997-
var err error
998-
app.qms, err = app.setupVersionDB(homePath, keys, tkeys, memKeys, okeys)
999-
if err != nil {
1000-
panic(err)
1001-
}
1002-
}
1003-
10041013
// initialize BaseApp
10051014
app.SetInitChainer(app.InitChainer)
10061015
app.SetPreBlocker(app.PreBlocker)
@@ -1045,12 +1054,13 @@ func New(
10451054
tmos.Exit(err.Error())
10461055
}
10471056

1048-
if app.qms != nil {
1049-
v1 := app.qms.LatestVersion()
1050-
v2 := app.LastBlockHeight()
1051-
if v1 > 0 && v1 < v2 {
1057+
if qmsVersion > 0 {
1058+
// it should not happens since we constraint the loaded iavl version to not exceed the versiondb version,
1059+
// still keep the check for safety.
1060+
iavlVersion := app.LastBlockHeight()
1061+
if qmsVersion < iavlVersion {
10521062
// try to prevent gap being created in versiondb
1053-
tmos.Exit(fmt.Sprintf("versiondb version %d lag behind iavl version %d", v1, v2))
1063+
tmos.Exit(fmt.Sprintf("versiondb version %d lag behind iavl version %d", qmsVersion, iavlVersion))
10541064
}
10551065
}
10561066

app/storeloader.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package app
2+
3+
import (
4+
storetypes "cosmossdk.io/store/types"
5+
upgradetypes "cosmossdk.io/x/upgrade/types"
6+
"github.com/cosmos/cosmos-sdk/baseapp"
7+
)
8+
9+
// MaxVersionStoreLoader will be used when there's versiondb to cap the loaded iavl version
10+
func MaxVersionStoreLoader(version int64) baseapp.StoreLoader {
11+
if version == 0 {
12+
return baseapp.DefaultStoreLoader
13+
}
14+
15+
return func(ms storetypes.CommitMultiStore) error {
16+
return ms.LoadVersion(version)
17+
}
18+
}
19+
20+
// MaxVersionUpgradeStoreLoader is used to prepare baseapp with a fixed StoreLoader
21+
func MaxVersionUpgradeStoreLoader(version int64, upgradeHeight int64, storeUpgrades *storetypes.StoreUpgrades) baseapp.StoreLoader {
22+
if version == 0 {
23+
return upgradetypes.UpgradeStoreLoader(upgradeHeight, storeUpgrades)
24+
}
25+
26+
return func(ms storetypes.CommitMultiStore) error {
27+
if upgradeHeight == ms.LastCommitID().Version+1 {
28+
// Check if the current commit version and upgrade height matches
29+
if len(storeUpgrades.Renamed) > 0 || len(storeUpgrades.Deleted) > 0 || len(storeUpgrades.Added) > 0 {
30+
return ms.LoadLatestVersionAndUpgrade(storeUpgrades)
31+
}
32+
}
33+
34+
// Otherwise load default store loader
35+
return MaxVersionStoreLoader(version)(ms)
36+
}
37+
}

app/upgrades.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ import (
1717
evmtypes "github.com/evmos/ethermint/x/evm/types"
1818
)
1919

20-
func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec) {
20+
// RegisterUpgradeHandlers returns if store loader is overridden
21+
func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec, maxVersion int64) bool {
2122
planName := "v1.4"
2223
app.UpgradeKeeper.SetUpgradeHandler(planName, func(ctx context.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) {
2324
m, err := app.ModuleManager.RunMigrations(ctx, app.configurator, fromVM)
@@ -54,14 +55,18 @@ func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec) {
5455
}
5556
if !app.UpgradeKeeper.IsSkipHeight(upgradeInfo.Height) {
5657
if upgradeInfo.Name == planName {
57-
app.SetStoreLoader(upgradetypes.UpgradeStoreLoader(upgradeInfo.Height, &storetypes.StoreUpgrades{
58+
app.SetStoreLoader(MaxVersionUpgradeStoreLoader(maxVersion, upgradeInfo.Height, &storetypes.StoreUpgrades{
5859
Added: []string{
5960
icahosttypes.StoreKey,
6061
},
6162
Deleted: []string{"icaauth"},
6263
}))
64+
65+
return true
6366
}
6467
}
68+
69+
return false
6570
}
6671

6772
func UpdateExpeditedParams(ctx context.Context, gov govkeeper.Keeper) error {

integration_tests/shell.nix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@ pkgs.mkShell {
1717
shellHook = ''
1818
mkdir ./coverage
1919
export GOCOVERDIR=./coverage
20+
export TMPDIR=/tmp
2021
'';
2122
}

integration_tests/test_versiondb.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from pystarport import ports
66

77
from .network import Cronos
8-
from .utils import ADDRS, send_transaction, wait_for_port
8+
from .utils import ADDRS, send_transaction, w3_wait_for_new_blocks, wait_for_port
99

1010

1111
def test_versiondb_migration(cronos: Cronos):
@@ -37,6 +37,9 @@ def test_versiondb_migration(cronos: Cronos):
3737
balance1 = w3.eth.get_balance(community)
3838
block1 = w3.eth.block_number
3939

40+
# wait for a few blocks
41+
w3_wait_for_new_blocks(w3, 5)
42+
4043
# stop the network first
4144
print("stop all nodes")
4245
print(cronos.supervisorctl("stop", "all"))
@@ -45,7 +48,10 @@ def test_versiondb_migration(cronos: Cronos):
4548

4649
changeset_dir = tempfile.mkdtemp(dir=cronos.base_dir)
4750
print("dump to:", changeset_dir)
48-
print(cli1.changeset_dump(changeset_dir))
51+
52+
# only restore to an intermidiate version to test version mismatch behavior
53+
print(cli1.changeset_dump(changeset_dir, end_version=block1 + 1))
54+
4955
snapshot_dir = tempfile.mkdtemp(dir=cronos.base_dir)
5056
print("verify and save to snapshot:", snapshot_dir)
5157
_, commit_info = cli0.changeset_verify(changeset_dir, save_snapshot=snapshot_dir)

memiavl/db.go

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ type DB struct {
8080
mtx sync.Mutex
8181
// worker goroutine IdleTimeout = 5s
8282
snapshotWriterPool *pond.WorkerPool
83+
84+
// reusable write batch
85+
wbatch wal.Batch
8386
}
8487

8588
type Options struct {
@@ -440,8 +443,13 @@ func (db *DB) checkBackgroundSnapshotRewrite() error {
440443
db.snapshotRewriteCancel = nil
441444

442445
if result.mtree == nil {
443-
// background snapshot rewrite failed
444-
return fmt.Errorf("background snapshot rewriting failed: %w", result.err)
446+
if result.err != nil {
447+
// background snapshot rewrite failed
448+
return fmt.Errorf("background snapshot rewriting failed: %w", result.err)
449+
}
450+
451+
// background snapshot rewrite don't success, but no error to propagate, ignore it.
452+
return nil
445453
}
446454

447455
// wait for potential pending wal writings to finish, to make sure we catch up to latest state.
@@ -556,11 +564,17 @@ func (db *DB) Commit() (int64, error) {
556564
// async wal writing
557565
db.walChan <- &entry
558566
} else {
559-
bz, err := entry.data.Marshal()
567+
lastIndex, err := db.wal.LastIndex()
560568
if err != nil {
561569
return 0, err
562570
}
563-
if err := db.wal.Write(entry.index, bz); err != nil {
571+
572+
db.wbatch.Clear()
573+
if err := writeEntry(&db.wbatch, db.logger, lastIndex, &entry); err != nil {
574+
return 0, err
575+
}
576+
577+
if err := db.wal.WriteBatch(&db.wbatch); err != nil {
564578
return 0, err
565579
}
566580
}
@@ -591,13 +605,17 @@ func (db *DB) initAsyncCommit() {
591605
break
592606
}
593607

608+
lastIndex, err := db.wal.LastIndex()
609+
if err != nil {
610+
walQuit <- err
611+
return
612+
}
613+
594614
for _, entry := range entries {
595-
bz, err := entry.data.Marshal()
596-
if err != nil {
615+
if err := writeEntry(&batch, db.logger, lastIndex, entry); err != nil {
597616
walQuit <- err
598617
return
599618
}
600-
batch.Write(entry.index, bz)
601619
}
602620

603621
if err := db.wal.WriteBatch(&batch); err != nil {
@@ -749,7 +767,8 @@ func (db *DB) rewriteSnapshotBackground() error {
749767

750768
cloned.logger.Info("start rewriting snapshot", "version", cloned.Version())
751769
if err := cloned.RewriteSnapshotWithContext(ctx); err != nil {
752-
ch <- snapshotResult{err: err}
770+
// write error log but don't stop the client, it could happen when load an old version.
771+
cloned.logger.Error("failed to rewrite snapshot", "err", err)
753772
return
754773
}
755774
cloned.logger.Info("finished rewriting snapshot", "version", cloned.Version())
@@ -1093,3 +1112,17 @@ func channelBatchRecv[T any](ch <-chan *T) []*T {
10931112

10941113
return result
10951114
}
1115+
1116+
func writeEntry(batch *wal.Batch, logger Logger, lastIndex uint64, entry *walEntry) error {
1117+
bz, err := entry.data.Marshal()
1118+
if err != nil {
1119+
return err
1120+
}
1121+
1122+
if entry.index <= lastIndex {
1123+
logger.Error("commit old version idempotently", "lastIndex", lastIndex, "version", entry.index)
1124+
} else {
1125+
batch.Write(entry.index, bz)
1126+
}
1127+
return nil
1128+
}

memiavl/db_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package memiavl
33
import (
44
"encoding/hex"
55
"errors"
6+
fmt "fmt"
67
"os"
78
"path/filepath"
89
"runtime/debug"
@@ -497,3 +498,73 @@ func TestRepeatedApplyChangeSet(t *testing.T) {
497498
})
498499
require.NoError(t, err)
499500
}
501+
502+
func TestIdempotentWrite(t *testing.T) {
503+
for _, asyncCommit := range []bool{false, true} {
504+
t.Run(fmt.Sprintf("asyncCommit=%v", asyncCommit), func(t *testing.T) {
505+
testIdempotentWrite(t, asyncCommit)
506+
})
507+
}
508+
}
509+
510+
func testIdempotentWrite(t *testing.T, asyncCommit bool) {
511+
dir := t.TempDir()
512+
513+
asyncCommitBuffer := -1
514+
if asyncCommit {
515+
asyncCommitBuffer = 10
516+
}
517+
518+
db, err := Load(dir, Options{
519+
CreateIfMissing: true,
520+
InitialStores: []string{"test1", "test2"},
521+
AsyncCommitBuffer: asyncCommitBuffer,
522+
})
523+
require.NoError(t, err)
524+
525+
// generate some data into db
526+
var changes [][]*NamedChangeSet
527+
for i := 0; i < 10; i++ {
528+
cs := []*NamedChangeSet{
529+
{
530+
Name: "test1",
531+
Changeset: ChangeSet{Pairs: mockKVPairs("hello", fmt.Sprintf("world%d", i))},
532+
},
533+
{
534+
Name: "test2",
535+
Changeset: ChangeSet{Pairs: mockKVPairs("hello", fmt.Sprintf("world%d", i))},
536+
},
537+
}
538+
changes = append(changes, cs)
539+
}
540+
541+
for _, cs := range changes {
542+
require.NoError(t, db.ApplyChangeSets(cs))
543+
_, err := db.Commit()
544+
require.NoError(t, err)
545+
}
546+
547+
commitInfo := *db.LastCommitInfo()
548+
require.NoError(t, db.Close())
549+
550+
// reload db from disk at an intermediate version
551+
db, err = Load(dir, Options{TargetVersion: 5})
552+
require.NoError(t, err)
553+
554+
// replay some random writes to reach same version
555+
for i := 0; i < 5; i++ {
556+
require.NoError(t, db.ApplyChangeSets(changes[i+5]))
557+
_, err := db.Commit()
558+
require.NoError(t, err)
559+
}
560+
561+
// it should reach same result
562+
require.Equal(t, commitInfo, *db.LastCommitInfo())
563+
564+
require.NoError(t, db.Close())
565+
566+
// reload db again, it should reach same result
567+
db, err = Load(dir, Options{})
568+
require.NoError(t, err)
569+
require.Equal(t, commitInfo, *db.LastCommitInfo())
570+
}

0 commit comments

Comments
 (0)