Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ Ref: https://keepachangelog.com/en/1.0.0/

# Changelog

## [Unreleased-Upstream]

### Features

* (baseapp) [#205](https://github.com/crypto-org-chain/cosmos-sdk/pull/205) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support tx parallel execution.
* [#252](https://github.com/crypto-org-chain/cosmos-sdk/pull/252) Add `BlockGasWanted` to `Context` to support feemarket module.


## [v0.53.0](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.53.0) - 2025-04-29

### Features
Expand Down Expand Up @@ -166,7 +174,8 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [v0.50.9](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.50.9) - 2024-08-07

## Bug Fixes

### Bug Fixes

* (baseapp) [#21159](https://github.com/cosmos/cosmos-sdk/pull/21159) Return PreBlocker events in FinalizeBlockResponse.
* [#20939](https://github.com/cosmos/cosmos-sdk/pull/20939) Fix collection reverse iterator to include `pagination.key` in the result.
Expand Down
93 changes: 63 additions & 30 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,48 +791,42 @@

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
app.finalizeBlockState.SetContext(
app.finalizeBlockState.Context().
WithBlockGasMeter(gasMeter).
WithTxCount(len(req.Txs)),
)

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
//
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
// vote extensions, so skip those.
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
for _, rawTx := range req.Txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
txResults, err := app.executeTxs(ctx, req.Txs)
if err != nil {
// usually due to canceled
return nil, err
}

if app.finalizeBlockState.ms.TracingEnabled() {
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
var (
blockGasUsed uint64
blockGasWanted uint64
)
for _, res := range txResults {
blockGasUsed += uint64(res.GasUsed)

Check failure

Code scanning / gosec

integer overflow conversion uint64 -> uint32 Error

integer overflow conversion int64 -> uint64
blockGasWanted += uint64(res.GasWanted)

Check failure

Code scanning / gosec

integer overflow conversion uint64 -> uint32 Error

integer overflow conversion int64 -> uint64
}
app.finalizeBlockState.SetContext(
app.finalizeBlockState.Context().
WithBlockGasUsed(blockGasUsed).
WithBlockGasWanted(blockGasWanted),
)

endBlock, err := app.endBlock(ctx)
if err != nil {
return nil, err
}
Expand All @@ -856,6 +850,45 @@
}, nil
}

func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
if app.txExecutor != nil {
return app.txExecutor(ctx, len(txs), app.finalizeBlockState.ms, func(i int, ms storetypes.MultiStore) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(txs[i], i, ms)
})
}

txResults := make([]*abci.ExecTxResult, 0, len(txs))
for i, rawTx := range txs {
var response *abci.ExecTxResult

if _, err := app.txDecoder(rawTx); err == nil {
response = app.deliverTx(rawTx, i)
} else {
// In the case where a transaction included in a block proposal is malformed,
// we still want to return a default response to comet. This is because comet
// expects a response for each transaction included in a block proposal.
response = sdkerrors.ResponseExecTxResultWithEvents(
sdkerrors.ErrTxDecode,
0,
0,
nil,
false,
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
}
return txResults, nil
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
Expand Down
28 changes: 22 additions & 6 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ type BaseApp struct {
//
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
disableBlockGasMeter bool

// Optional alternative tx executor, used for block-stm parallel transaction execution.
txExecutor TxExecutor
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down Expand Up @@ -674,17 +677,17 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
}

// retrieve the context for the tx w/ txBytes and other memoized values.
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
app.mu.Lock()
defer app.mu.Unlock()

modeState := app.getState(mode)
if modeState == nil {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.Context().
WithTxBytes(txBytes).
WithGasMeter(storetypes.NewInfiniteGasMeter())
WithGasMeter(storetypes.NewInfiniteGasMeter()).
WithTxIndex(txIndex)
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

ctx = ctx.WithIsSigverifyTx(app.sigverifyTx)
Expand Down Expand Up @@ -769,7 +772,11 @@ func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, er
return resp, nil
}

func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
func (app *BaseApp) deliverTx(tx []byte, txIndex int) *abci.ExecTxResult {
return app.deliverTxWithMultiStore(tx, txIndex, nil)
}

func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txIndex int, txMultiStore storetypes.MultiStore) *abci.ExecTxResult {
gInfo := sdk.GasInfo{}
resultStr := "successful"

Expand All @@ -782,7 +789,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx, nil)
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, nil, txIndex, txMultiStore)
if err != nil {
resultStr = "failed"
resp = sdkerrors.ResponseExecTxResultWithEvents(
Expand Down Expand Up @@ -842,12 +849,19 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
// both txbytes and the decoded tx are passed to runTx to avoid the state machine encoding the tx and decoding the transaction twice
// passing the decoded tx to runTX is optional, it will be decoded if the tx is nil
func (app *BaseApp) runTx(mode execMode, txBytes []byte, tx sdk.Tx) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
return app.runTxWithMultiStore(mode, txBytes, tx, -1, nil)
}

func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, tx sdk.Tx, txIndex int, txMultiStore storetypes.MultiStore) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
// meter, so we initialize upfront.
var gasWanted uint64

ctx := app.getContextForTx(mode, txBytes)
ctx := app.getContextForTx(mode, txBytes, txIndex)
if txMultiStore != nil {
ctx = ctx.WithMultiStore(txMultiStore)
}
ms := ctx.MultiStore()

// only run the tx if there is block gas remaining
Expand Down Expand Up @@ -1040,6 +1054,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
break
}

ctx = ctx.WithMsgIndex(i)

handler := app.msgServiceRouter.Handler(msg)
if handler == nil {
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)
Expand Down
2 changes: 1 addition & 1 deletion baseapp/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var _ genesis.TxHandler = (*BaseApp)(nil)
// ExecuteGenesisTx implements genesis.GenesisState from
// cosmossdk.io/core/genesis to set initial state in genesis
func (ba *BaseApp) ExecuteGenesisTx(tx []byte) error {
res := ba.deliverTx(tx)
res := ba.deliverTx(tx, -1)

if res.Code != types.CodeTypeOK {
return errors.New(res.Log)
Expand Down
10 changes: 10 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func DisableBlockGasMeter() func(*BaseApp) {
return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) }
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
return func(app *BaseApp) { app.txExecutor = executor }
}

func (app *BaseApp) SetName(name string) {
if app.sealed {
panic("SetName() on sealed BaseApp")
Expand Down Expand Up @@ -403,3 +408,8 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) {
app.grpcQueryRouter = grpcQueryRouter
}

// SetTxExecutor sets a custom tx executor for the BaseApp, usually for parallel execution.
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
app.txExecutor = executor
}
4 changes: 2 additions & 2 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (app *BaseApp) NewUncachedContext(isCheckTx bool, header cmtproto.Header) s
}

func (app *BaseApp) GetContextForFinalizeBlock(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeFinalize, txBytes)
return app.getContextForTx(execModeFinalize, txBytes, -1)
}

func (app *BaseApp) GetContextForCheckTx(txBytes []byte) sdk.Context {
return app.getContextForTx(execModeCheck, txBytes)
return app.getContextForTx(execModeCheck, txBytes, -1)
}
16 changes: 16 additions & 0 deletions baseapp/txexecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package baseapp

import (
"context"

abci "github.com/cometbft/cometbft/abci/types"

"cosmossdk.io/store/types"
)

type TxExecutor func(
ctx context.Context,
blockSize int,
cms types.MultiStore,
deliverTxWithMultiStore func(int, types.MultiStore) *abci.ExecTxResult,
) ([]*abci.ExecTxResult, error)
43 changes: 43 additions & 0 deletions types/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ type Context struct {
streamingManager storetypes.StreamingManager
cometInfo comet.BlockInfo
headerInfo header.Info

// the index of the current tx in the block, -1 means not in finalize block context
txIndex int
// the index of the current msg in the tx, -1 means not in finalize block context
msgIndex int
// the total number of transactions in current block
txCount int
// sum the gas used by all the transactions in the current block, only accessible by end blocker
blockGasUsed uint64
// sum the gas wanted by all the transactions in the current block, only accessible by end blocker
blockGasWanted uint64
}

// Proposed rename, not done to avoid API breakage
Expand Down Expand Up @@ -92,6 +103,11 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans
func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager }
func (c Context) CometInfo() comet.BlockInfo { return c.cometInfo }
func (c Context) HeaderInfo() header.Info { return c.headerInfo }
func (c Context) TxIndex() int { return c.txIndex }
func (c Context) MsgIndex() int { return c.msgIndex }
func (c Context) TxCount() int { return c.txCount }
func (c Context) BlockGasUsed() uint64 { return c.blockGasUsed }
func (c Context) BlockGasWanted() uint64 { return c.blockGasWanted }

// BlockHeader returns the header by value.
func (c Context) BlockHeader() cmtproto.Header {
Expand Down Expand Up @@ -138,6 +154,8 @@ func NewContext(ms storetypes.MultiStore, header cmtproto.Header, isCheckTx bool
eventManager: NewEventManager(),
kvGasConfig: storetypes.KVGasConfig(),
transientKVGasConfig: storetypes.TransientGasConfig(),
txIndex: -1,
msgIndex: -1,
}
}

Expand Down Expand Up @@ -317,6 +335,31 @@ func (c Context) WithHeaderInfo(headerInfo header.Info) Context {
return c
}

func (c Context) WithTxIndex(txIndex int) Context {
c.txIndex = txIndex
return c
}

func (c Context) WithTxCount(txCount int) Context {
c.txCount = txCount
return c
}

func (c Context) WithMsgIndex(msgIndex int) Context {
c.msgIndex = msgIndex
return c
}

func (c Context) WithBlockGasUsed(gasUsed uint64) Context {
c.blockGasUsed = gasUsed
return c
}

func (c Context) WithBlockGasWanted(gasWanted uint64) Context {
c.blockGasWanted = gasWanted
return c
}

// TODO: remove???
func (c Context) IsZero() bool {
return c.ms == nil
Expand Down
Loading