From a4e10c72f32d582c05ea049bfdf2740a226aba8c Mon Sep 17 00:00:00 2001 From: yihuang Date: Fri, 22 Mar 2024 09:14:32 +0800 Subject: [PATCH 1/2] Problem: parallel tx execution is not supported (#205) add basic support in sdk: - add a TxExecutor baseapp option - add TxIndex/TxCount/MsgIndex in context Update CHANGELOG.md Signed-off-by: yihuang fix misspell fix lint run gci fix lint gci seems not compatible with gofumpt --- CHANGELOG.md | 8 +++- baseapp/abci.go | 85 ++++++++++++++++++++++++++--------------- baseapp/baseapp.go | 28 +++++++++++--- baseapp/genesis.go | 2 +- baseapp/options.go | 10 +++++ baseapp/test_helpers.go | 4 +- baseapp/txexecutor.go | 16 ++++++++ types/context.go | 35 +++++++++++++++++ 8 files changed, 148 insertions(+), 40 deletions(-) create mode 100644 baseapp/txexecutor.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 27b05d57f0ce..e1faa8f69d23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -166,7 +166,13 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [v0.50.9](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.50.9) - 2024-08-07 -## Bug Fixes +### Features + +* (baseapp) [#205](https://github.com/crypto-org-chain/cosmos-sdk/pull/205) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support tx parallel execution. + +## [Unreleased-Upstream] + +### Bug Fixes * (baseapp) [#21159](https://github.com/cosmos/cosmos-sdk/pull/21159) Return PreBlocker events in FinalizeBlockResponse. * [#20939](https://github.com/cosmos/cosmos-sdk/pull/20939) Fix collection reverse iterator to include `pagination.key` in the result. diff --git a/baseapp/abci.go b/baseapp/abci.go index 8d4d83befa5c..306f34bbf84d 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -791,48 +791,34 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request // Reset the gas meter so that the AnteHandlers aren't required to gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context()) - app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter)) + app.finalizeBlockState.SetContext( + app.finalizeBlockState.Context(). + WithBlockGasMeter(gasMeter). + WithTxCount(len(req.Txs)), + ) // Iterate over all raw transactions in the proposal and attempt to execute // them, gathering the execution results. // // NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g. // vote extensions, so skip those. - txResults := make([]*abci.ExecTxResult, 0, len(req.Txs)) - for _, rawTx := range req.Txs { - var response *abci.ExecTxResult - - if _, err := app.txDecoder(rawTx); err == nil { - response = app.deliverTx(rawTx) - } else { - // In the case where a transaction included in a block proposal is malformed, - // we still want to return a default response to comet. This is because comet - // expects a response for each transaction included in a block proposal. - response = sdkerrors.ResponseExecTxResultWithEvents( - sdkerrors.ErrTxDecode, - 0, - 0, - nil, - false, - ) - } - - // check after every tx if we should abort - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - // continue - } - - txResults = append(txResults, response) + txResults, err := app.executeTxs(ctx, req.Txs) + if err != nil { + // usually due to canceled + return nil, err } if app.finalizeBlockState.ms.TracingEnabled() { app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore) } - endBlock, err := app.endBlock(app.finalizeBlockState.Context()) + var blockGasUsed uint64 + for _, res := range txResults { + blockGasUsed += uint64(res.GasUsed) + } + sdkCtx := app.finalizeBlockState.Context().WithBlockGasUsed(blockGasUsed) + + endBlock, err := app.endBlock(sdkCtx) if err != nil { return nil, err } @@ -856,6 +842,45 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request }, nil } +func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) { + if app.txExecutor != nil { + return app.txExecutor(ctx, len(txs), app.finalizeBlockState.ms, func(i int, ms storetypes.MultiStore) *abci.ExecTxResult { + return app.deliverTxWithMultiStore(txs[i], i, ms) + }) + } + + txResults := make([]*abci.ExecTxResult, 0, len(txs)) + for i, rawTx := range txs { + var response *abci.ExecTxResult + + if _, err := app.txDecoder(rawTx); err == nil { + response = app.deliverTx(rawTx, i) + } else { + // In the case where a transaction included in a block proposal is malformed, + // we still want to return a default response to comet. This is because comet + // expects a response for each transaction included in a block proposal. + response = sdkerrors.ResponseExecTxResultWithEvents( + sdkerrors.ErrTxDecode, + 0, + 0, + nil, + false, + ) + } + + // check after every tx if we should abort + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + // continue + } + + txResults = append(txResults, response) + } + return txResults, nil +} + // FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock. // Specifically, it will execute an application's BeginBlock (if defined), followed // by the transactions in the proposal, finally followed by the application's diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index c10fa15936a9..9dbae0ba5629 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -198,6 +198,9 @@ type BaseApp struct { // // SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler. disableBlockGasMeter bool + + // Optional alternative tx executor, used for block-stm parallel transaction execution. + txExecutor TxExecutor } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a @@ -674,17 +677,17 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter { } // retrieve the context for the tx w/ txBytes and other memoized values. -func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context { +func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context { app.mu.Lock() defer app.mu.Unlock() - modeState := app.getState(mode) if modeState == nil { panic(fmt.Sprintf("state is nil for mode %v", mode)) } ctx := modeState.Context(). WithTxBytes(txBytes). - WithGasMeter(storetypes.NewInfiniteGasMeter()) + WithGasMeter(storetypes.NewInfiniteGasMeter()). + WithTxIndex(txIndex) // WithVoteInfos(app.voteInfos) // TODO: identify if this is needed ctx = ctx.WithIsSigverifyTx(app.sigverifyTx) @@ -769,7 +772,11 @@ func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, er return resp, nil } -func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult { +func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult { + return app.deliverTxWithMultiStore(tx, txIndex, nil) +} + +func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore) *abci.ExecTxResult { gInfo := sdk.GasInfo{} resultStr := "successful" @@ -782,7 +789,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult { telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted") }() - gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx, nil) + gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, nil, txIndex, txMultiStore) if err != nil { resultStr = "failed" resp = sdkerrors.ResponseExecTxResultWithEvents( @@ -842,12 +849,19 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) { // both txbytes and the decoded tx are passed to runTx to avoid the state machine encoding the tx and decoding the transaction twice // passing the decoded tx to runTX is optional, it will be decoded if the tx is nil func (app *BaseApp) runTx(mode execMode, txBytes []byte, tx sdk.Tx) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) { + return app.runTxWithMultiStore(mode, txBytes, tx, -1, nil) +} + +func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, tx sdk.Tx, txIndex int, txMultiStore storetypes.MultiStore) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) { // NOTE: GasWanted should be returned by the AnteHandler. GasUsed is // determined by the GasMeter. We need access to the context to get the gas // meter, so we initialize upfront. var gasWanted uint64 - ctx := app.getContextForTx(mode, txBytes) + ctx := app.getContextForTx(mode, txBytes, txIndex) + if txMultiStore != nil { + ctx = ctx.WithMultiStore(txMultiStore) + } ms := ctx.MultiStore() // only run the tx if there is block gas remaining @@ -1040,6 +1054,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me break } + ctx = ctx.WithMsgIndex(i) + handler := app.msgServiceRouter.Handler(msg) if handler == nil { return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg) diff --git a/baseapp/genesis.go b/baseapp/genesis.go index 4662d1187b4a..2e5455adc3f5 100644 --- a/baseapp/genesis.go +++ b/baseapp/genesis.go @@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil) // ExecuteGenesisTx implements genesis.GenesisState from // cosmossdk.io/core/genesis to set initial state in genesis func (ba *BaseApp) ExecuteGenesisTx(tx []byte) error { - res := ba.deliverTx(tx) + res := ba.deliverTx(tx, -1) if res.Code != types.CodeTypeOK { return errors.New(res.Log) diff --git a/baseapp/options.go b/baseapp/options.go index 1f809e498b9c..ec32a241401f 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -129,6 +129,11 @@ func DisableBlockGasMeter() func(*BaseApp) { return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) } } +// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution. +func SetTxExecutor(executor TxExecutor) func(*BaseApp) { + return func(app *BaseApp) { app.txExecutor = executor } +} + func (app *BaseApp) SetName(name string) { if app.sealed { panic("SetName() on sealed BaseApp") @@ -403,3 +408,8 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) { func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) { app.grpcQueryRouter = grpcQueryRouter } + +// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution. +func (app *BaseApp) SetTxExecutor(executor TxExecutor) { + app.txExecutor = executor +} diff --git a/baseapp/test_helpers.go b/baseapp/test_helpers.go index b3aa396a0250..91914f972136 100644 --- a/baseapp/test_helpers.go +++ b/baseapp/test_helpers.go @@ -77,9 +77,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s } func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context { - return app.getContextForTx(execModeFinalize, txBytes) + return app.getContextForTx(execModeFinalize, txBytes, -1) } func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context { - return app.getContextForTx(execModeCheck, txBytes) + return app.getContextForTx(execModeCheck, txBytes, -1) } diff --git a/baseapp/txexecutor.go b/baseapp/txexecutor.go new file mode 100644 index 000000000000..230250ce9992 --- /dev/null +++ b/baseapp/txexecutor.go @@ -0,0 +1,16 @@ +package baseapp + +import ( + "context" + + abci "github.com/cometbft/cometbft/abci/types" + + "cosmossdk.io/store/types" +) + +type TxExecutor func( + ctx context.Context, + blockSize int, + cms types.MultiStore, + deliverTxWithMultiStore func(int, types.MultiStore) *abci.ExecTxResult, +) ([]*abci.ExecTxResult, error) diff --git a/types/context.go b/types/context.go index 5742ec41c85d..01f174253be8 100644 --- a/types/context.go +++ b/types/context.go @@ -64,6 +64,15 @@ type Context struct { streamingManager storetypes.StreamingManager cometInfo comet.BlockInfo headerInfo header.Info + + // the index of the current tx in the block, -1 means not in finalize block context + txIndex int + // the index of the current msg in the tx, -1 means not in finalize block context + msgIndex int + // the total number of transactions in current block + txCount int + // sum the gas used by all the transactions in the current block, only accessible by end blocker + blockGasUsed uint64 } // Proposed rename, not done to avoid API breakage @@ -92,6 +101,10 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager } func (c Context) CometInfo() comet.BlockInfo { return c.cometInfo } func (c Context) HeaderInfo() header.Info { return c.headerInfo } +func (c Context) TxIndex() int { return c.txIndex } +func (c Context) MsgIndex() int { return c.msgIndex } +func (c Context) TxCount() int { return c.txCount } +func (c Context) BlockGasUsed() uint64 { return c.blockGasUsed } // BlockHeader returns the header by value. func (c Context) BlockHeader() cmtproto.Header { @@ -138,6 +151,8 @@ func NewContext(ms storetypes.MultiStore, header cmtproto.Header, isCheckTx bool eventManager: NewEventManager(), kvGasConfig: storetypes.KVGasConfig(), transientKVGasConfig: storetypes.TransientGasConfig(), + txIndex: -1, + msgIndex: -1, } } @@ -317,6 +332,26 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context { return c } +func (c Context) WithTxIndex(txIndex int) Context { + c.txIndex = txIndex + return c +} + +func (c Context) WithTxCount(txCount int) Context { + c.txCount = txCount + return c +} + +func (c Context) WithMsgIndex(msgIndex int) Context { + c.msgIndex = msgIndex + return c +} + +func (c Context) WithBlockGasUsed(gasUsed uint64) Context { + c.blockGasUsed = gasUsed + return c +} + // TODO: remove??? func (c Context) IsZero() bool { return c.ms == nil From 37b5f9991b41e8094e276127d0605ee5e70fb355 Mon Sep 17 00:00:00 2001 From: yihuang Date: Wed, 3 Apr 2024 15:15:22 +0800 Subject: [PATCH 2/2] Problem: block gas used not set in context (#252) Solution: - fix the way context is updated --- CHANGELOG.md | 13 ++++++++----- baseapp/abci.go | 14 +++++++++++--- types/context.go | 8 ++++++++ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1faa8f69d23..e22052a46c4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,14 @@ Ref: https://keepachangelog.com/en/1.0.0/ # Changelog +## [Unreleased-Upstream] + +### Features + +* (baseapp) [#205](https://github.com/crypto-org-chain/cosmos-sdk/pull/205) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support tx parallel execution. +* [#252](https://github.com/crypto-org-chain/cosmos-sdk/pull/252) Add `BlockGasWanted` to `Context` to support feemarket module. + + ## [v0.53.0](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.53.0) - 2025-04-29 ### Features @@ -166,11 +174,6 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [v0.50.9](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.50.9) - 2024-08-07 -### Features - -* (baseapp) [#205](https://github.com/crypto-org-chain/cosmos-sdk/pull/205) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support tx parallel execution. - -## [Unreleased-Upstream] ### Bug Fixes diff --git a/baseapp/abci.go b/baseapp/abci.go index 306f34bbf84d..496e454d2aa0 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -812,13 +812,21 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore) } - var blockGasUsed uint64 + var ( + blockGasUsed uint64 + blockGasWanted uint64 + ) for _, res := range txResults { blockGasUsed += uint64(res.GasUsed) + blockGasWanted += uint64(res.GasWanted) } - sdkCtx := app.finalizeBlockState.Context().WithBlockGasUsed(blockGasUsed) + app.finalizeBlockState.SetContext( + app.finalizeBlockState.Context(). + WithBlockGasUsed(blockGasUsed). + WithBlockGasWanted(blockGasWanted), + ) - endBlock, err := app.endBlock(sdkCtx) + endBlock, err := app.endBlock(ctx) if err != nil { return nil, err } diff --git a/types/context.go b/types/context.go index 01f174253be8..4c66bcad78ff 100644 --- a/types/context.go +++ b/types/context.go @@ -73,6 +73,8 @@ type Context struct { txCount int // sum the gas used by all the transactions in the current block, only accessible by end blocker blockGasUsed uint64 + // sum the gas wanted by all the transactions in the current block, only accessible by end blocker + blockGasWanted uint64 } // Proposed rename, not done to avoid API breakage @@ -105,6 +107,7 @@ func (c Context) TxIndex() int { return c.txInd func (c Context) MsgIndex() int { return c.msgIndex } func (c Context) TxCount() int { return c.txCount } func (c Context) BlockGasUsed() uint64 { return c.blockGasUsed } +func (c Context) BlockGasWanted() uint64 { return c.blockGasWanted } // BlockHeader returns the header by value. func (c Context) BlockHeader() cmtproto.Header { @@ -352,6 +355,11 @@ func (c Context) WithBlockGasUsed(gasUsed uint64) Context { return c } +func (c Context) WithBlockGasWanted(gasWanted uint64) Context { + c.blockGasWanted = gasWanted + return c +} + // TODO: remove??? func (c Context) IsZero() bool { return c.ms == nil