Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,12 @@ func (app *BaseApp) getContextForTx(mode sdk.ExecMode, txBytes []byte) sdk.Conte
// a branched multi-store.
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, storetypes.CacheMultiStore) {
ms := ctx.MultiStore()
msCache := ms.CacheMultiStore()
var msCache storetypes.CacheMultiStore
if msPooled, ok := ms.(storetypes.PoolingMultiStore); ok {
msCache = msPooled.CacheMultiStorePooled()
} else {
msCache = ms.CacheMultiStore()
Comment on lines 632 to +639
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change potentially affects state.

Call sequence:

(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).cacheTxContext (baseapp/baseapp.go:625)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).runTx (baseapp/baseapp.go:760)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).deliverTx (baseapp/baseapp.go:688)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).internalFinalizeBlock (baseapp/baseapp.go:718)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).FinalizeBlock (baseapp/baseapp.go:884)

}
if msCache.TracingEnabled() {
msCache = msCache.SetTracingContext(
map[string]any{
Expand Down Expand Up @@ -839,6 +844,9 @@ func (app *BaseApp) runTx(mode sdk.ExecMode, txBytes []byte, tx sdk.Tx) (gInfo s
// writes do not happen if aborted/failed. This may have some
// performance benefits, but it'll be more difficult to get right.
anteCtx, msCache = app.cacheTxContext(ctx, txBytes)
if pooledMSCache, ok := msCache.(storetypes.PooledCacheMultiStore); ok {
defer pooledMSCache.Release()
}
anteCtx = anteCtx.WithEventManager(sdk.NewEventManager())
newCtx, err := app.anteHandler(anteCtx, tx, mode == execModeSimulate)

Expand Down Expand Up @@ -889,6 +897,9 @@ func (app *BaseApp) runTx(mode sdk.ExecMode, txBytes []byte, tx sdk.Tx) (gInfo s
// in case message processing fails. At this point, the MultiStore
// is a branch of a branch.
runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes)
if pooledMSCache, ok := msCache.(storetypes.PooledCacheMultiStore); ok {
defer pooledMSCache.Release()
}

// Attempt to execute all messages and only update state if all messages pass
// and we're in DeliverTx. Note, runMsgs will never return a reference to a
Expand Down
5 changes: 5 additions & 0 deletions store/cachekv/internal/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func NewBTree() BTree {
}
}

// Clear clears the tree.
func (bt BTree) Clear() {
bt.tree.Clear()
}

func (bt BTree) Set(key, value []byte) {
bt.tree.Set(newItem(key, value))
}
Expand Down
37 changes: 35 additions & 2 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ type Store struct {
parent types.KVStore
}

// PooledStore wraps a Store object and implements the types.PooledCacheKVStore interface,
// which allows it to be pooled and reused without the overhead of allocation.
type PooledStore struct {
Store
}

var _ types.CacheKVStore = (*Store)(nil)

// NewStore creates a new Store object
Expand All @@ -44,6 +50,33 @@ func NewStore(parent types.KVStore) *Store {
}
}

var storePool = sync.Pool{
New: func() any {
return &PooledStore{
Store: Store{
cache: make(map[string]*cValue),
unsortedCache: make(map[string]struct{}),
sortedCache: internal.NewBTree(),
},
}
},
}

// Release releases the PooledStore object back to the pool.
func (store *PooledStore) Release() {
store.resetCaches()
store.parent = nil
store.mtx = sync.Mutex{}
storePool.Put(store)
}

// NewPooledStore gets a PooledStore object from the pool.
func NewPooledStore(parent types.KVStore) *PooledStore {
store := storePool.Get().(*PooledStore)
store.parent = parent
return store
}

// GetStoreType implements Store.
func (store *Store) GetStoreType() types.StoreType {
return store.parent.GetStoreType()
Expand Down Expand Up @@ -112,7 +145,7 @@ func (store *Store) resetCaches() {
delete(store.unsortedCache, key)
}
}
store.sortedCache = internal.NewBTree()
store.sortedCache.Clear()
}

// Write implements Cachetypes.KVStore.
Expand All @@ -121,7 +154,7 @@ func (store *Store) Write() {
defer store.mtx.Unlock()

if len(store.cache) == 0 && len(store.unsortedCache) == 0 {
store.sortedCache = internal.NewBTree()
store.sortedCache.Clear()
return
}

Expand Down
105 changes: 88 additions & 17 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"fmt"
"io"
"maps"
"sync"

dbm "github.com/cosmos/cosmos-db"

Expand Down Expand Up @@ -33,16 +34,25 @@
traceContext types.TraceContext
}

var _ types.CacheMultiStore = Store{}
// PooledStore is a wrapper around Store that implements the PooledCacheKVStore interface.
// It's used to avoid allocating new Store instances .
type PooledStore struct {
Store
}

var (
_ types.CacheMultiStore = &Store{}
_ types.PooledCacheMultiStore = &PooledStore{}
)

// NewFromKVStore creates a new Store object from a mapping of store keys to
// CacheWrapper objects and a KVStore as the database. Each CacheWrapper store
// is a branched store.
func NewFromKVStore(
store types.KVStore, stores map[types.StoreKey]types.CacheWrapper,
keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext,
) Store {
cms := Store{
) *Store {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we need to change this function sig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is ultimately that sync.Pool works with pointers--if it didn't you'd end up copying the struct that it's trying to allocate for you--so I needed to be able to return a pointer to the PooledStore in newFromKVStorePooled. Once I'd made that change, interface compliance broke everywhere because the methods had value receivers. It ended up being simplest to just change everything to pointers for consistency.

cms := &Store{
db: cachekv.NewStore(store),
stores: make(map[types.StoreKey]types.CacheWrap, len(stores)),
keys: keys,
Expand All @@ -69,11 +79,67 @@
func NewStore(
db dbm.DB, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey,
traceWriter io.Writer, traceContext types.TraceContext,
) Store {
) *Store {
return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext)
}

func newCacheMultiStoreFromCMS(cms Store) Store {
var storePool = sync.Pool{
New: func() any {
return &PooledStore{
Store: Store{
stores: make(map[types.StoreKey]types.CacheWrap),
keys: make(map[string]types.StoreKey),
},
}
},
}

// newFromKVStorePooled returns a PooledStore object, populated with a mapping of store keys to
// CacheWrapper objects and a KVStore as the database.
func newFromKVStorePooled(
store types.KVStore, stores map[types.StoreKey]types.CacheWrap,
traceWriter io.Writer, traceContext types.TraceContext,
) *PooledStore {
cms := storePool.Get().(*PooledStore)
cms.traceWriter = traceWriter
cms.traceContext = traceContext
for key, store := range stores {
var cwStore types.CacheWrapper = store
if cms.TracingEnabled() {
tctx := cms.traceContext.Clone().Merge(types.TraceContext{
storeNameCtxKey: key.Name(),
})

cwStore = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, tctx)
}
cms.stores[key] = cachekv.NewPooledStore(cwStore.(types.KVStore))
}
cms.db = cachekv.NewPooledStore(store)
return cms
}

// Release releases the PooledStore object back to the pool.
func (cms *PooledStore) Release() {
// clear the stores map
for k, v := range cms.stores {
if pStore, ok := v.(*cachekv.PooledStore); ok {
pStore.Release()
}
delete(cms.stores, k)
}
for k := range cms.keys {
delete(cms.keys, k)
}
Comment on lines +134 to +136

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
if pStoreDb, ok := cms.db.(*cachekv.PooledStore); ok {
pStoreDb.Release()
}
cms.db = nil
cms.traceContext = nil
cms.traceWriter = nil
storePool.Put(cms)
}

func newCacheMultiStoreFromCMS(cms *Store) *Store {
stores := make(map[types.StoreKey]types.CacheWrapper)
for k, v := range cms.stores {
stores[k] = v
Expand All @@ -84,7 +150,7 @@

// SetTracer sets the tracer for the MultiStore that the underlying
// stores will utilize to trace operations. A MultiStore is returned.
func (cms Store) SetTracer(w io.Writer) types.MultiStore {
func (cms *Store) SetTracer(w io.Writer) types.MultiStore {
cms.traceWriter = w
return cms
}
Expand All @@ -93,7 +159,7 @@
// the given context with the existing context by key. Any existing keys will
// be overwritten. It is implied that the caller should update the context when
// necessary between tracing operations. It returns a modified MultiStore.
func (cms Store) SetTracingContext(tc types.TraceContext) types.MultiStore {
func (cms *Store) SetTracingContext(tc types.TraceContext) types.MultiStore {
if cms.traceContext != nil {
maps.Copy(cms.traceContext, tc)
} else {
Expand All @@ -104,55 +170,60 @@
}

// TracingEnabled returns if tracing is enabled for the MultiStore.
func (cms Store) TracingEnabled() bool {
func (cms *Store) TracingEnabled() bool {
return cms.traceWriter != nil
}

// LatestVersion returns the branch version of the store
func (cms Store) LatestVersion() int64 {
func (cms *Store) LatestVersion() int64 {
panic("cannot get latest version from branch cached multi-store")
}

// GetStoreType returns the type of the store.
func (cms Store) GetStoreType() types.StoreType {
func (cms *Store) GetStoreType() types.StoreType {
return types.StoreTypeMulti
}

// Write calls Write on each underlying store.
func (cms Store) Write() {
func (cms *Store) Write() {
cms.db.Write()
for _, store := range cms.stores {
store.Write()
}
}

// CacheWrap implements CacheWrapper, returns the cache multi-store as a CacheWrap.
func (cms Store) CacheWrap() types.CacheWrap {
func (cms *Store) CacheWrap() types.CacheWrap {
return cms.CacheMultiStore().(types.CacheWrap)
}

// CacheWrapWithTrace implements the CacheWrapper interface.
func (cms Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap {
func (cms *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap {
return cms.CacheWrap()
}

// CacheMultiStore implements MultiStore, returns a new CacheMultiStore from the
// underlying CacheMultiStore.
func (cms Store) CacheMultiStore() types.CacheMultiStore {
func (cms *Store) CacheMultiStore() types.CacheMultiStore {
return newCacheMultiStoreFromCMS(cms)
}

// CacheMultiStorePooled returns a PooledCacheMultiStore object from a pool.
func (cms *Store) CacheMultiStorePooled() types.PooledCacheMultiStore {
return newFromKVStorePooled(cms.db, cms.stores, cms.traceWriter, cms.traceContext)
}

// CacheMultiStoreWithVersion implements the MultiStore interface. It will panic
// as an already cached multi-store cannot load previous versions.
//
// TODO: The store implementation can possibly be modified to support this as it
// seems safe to load previous versions (heights).
func (cms Store) CacheMultiStoreWithVersion(_ int64) (types.CacheMultiStore, error) {
func (cms *Store) CacheMultiStoreWithVersion(_ int64) (types.CacheMultiStore, error) {
panic("cannot branch cached multi-store with a version")
}

// GetStore returns an underlying Store by key.
func (cms Store) GetStore(key types.StoreKey) types.Store {
func (cms *Store) GetStore(key types.StoreKey) types.Store {
s := cms.stores[key]
if key == nil || s == nil {
panic(fmt.Sprintf("kv store with key %v has not been registered in stores", key))
Expand All @@ -161,7 +232,7 @@
}

// GetKVStore returns an underlying KVStore by key.
func (cms Store) GetKVStore(key types.StoreKey) types.KVStore {
func (cms *Store) GetKVStore(key types.StoreKey) types.KVStore {
store := cms.stores[key]
if key == nil || store == nil {
panic(fmt.Sprintf("kv store with key %v has not been registered in stores", key))
Expand Down
6 changes: 3 additions & 3 deletions store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestCacheMultiStore(t *testing.T) {
ms := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))

cacheMulti := ms.CacheMultiStore()
require.IsType(t, cachemulti.Store{}, cacheMulti)
require.IsType(t, &cachemulti.Store{}, cacheMulti)
}

func TestCacheMultiStoreWithVersion(t *testing.T) {
Expand Down Expand Up @@ -819,10 +819,10 @@ func TestCacheWraps(t *testing.T) {
multi := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))

cacheWrapper := multi.CacheWrap()
require.IsType(t, cachemulti.Store{}, cacheWrapper)
require.IsType(t, &cachemulti.Store{}, cacheWrapper)

cacheWrappedWithTrace := multi.CacheWrapWithTrace(nil, nil)
require.IsType(t, cachemulti.Store{}, cacheWrappedWithTrace)
require.IsType(t, &cachemulti.Store{}, cacheWrappedWithTrace)
}

func TestTraceConcurrency(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions store/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,24 @@ type MultiStore interface {
LatestVersion() int64
}

// PoolingMultiStore is a MultiStore that can return CacheMultiStores from a pool, without needing to allocate a new one each time.
type PoolingMultiStore interface {
MultiStore
CacheMultiStorePooled() PooledCacheMultiStore
}

// CacheMultiStore extends MultiStore with a Write() method.
type CacheMultiStore interface {
MultiStore
Write() // Writes operations to underlying KVStore
}

// PooledCacheMultiStore is a CacheMultiStore that can be pooled and reused without the overhead of allocation.
type PooledCacheMultiStore interface {
CacheMultiStore
Release() // Releases the cache
}

// CommitMultiStore is an interface for a MultiStore without cache capabilities.
type CommitMultiStore interface {
Committer
Expand Down
Loading