Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions db/kv/membatchwithdb/memory_mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewMemoryBatch(tx kv.TemporalTx, tmpDir string, logger log.Logger) *MemoryM
}
}

func (m *MemoryMutation) UnderlyingTx() kv.TemporalTx {
return m.db
}

func (m *MemoryMutation) UpdateTxn(tx kv.TemporalTx) {
m.db = tx
m.statelessCursors = nil
Expand Down
11 changes: 11 additions & 0 deletions execution/execmodule/block_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,17 @@ func (e *EthereumExecutionModule) GetAssembledBlock(ctx context.Context, req *ex
payload.BlobGasUsed = header.BlobGasUsed
payload.ExcessBlobGas = header.ExcessBlobGas
}
blockAccessList := block.BlockAccessList()
if header.BlockAccessListHash != nil || blockAccessList != nil {
payload.Version = 4
if header.BlockAccessListHash != nil {
payload.BlockAccessListHash = gointerfaces.ConvertHashToH256(*header.BlockAccessListHash)
}
payload.BlockAccessList = types.ConvertBlockAccessListToTypesProto(blockAccessList)
if payload.BlockAccessList == nil {
payload.BlockAccessList = []*typesproto.BlockAccessListAccount{}
}
}

blockValue := blockValue(blockWithReceipts, baseFee)

Expand Down
60 changes: 52 additions & 8 deletions execution/stagedsync/exec3_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,25 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U
if pe.cfg.chainConfig.IsAmsterdam(applyResult.BlockTime) || pe.cfg.experimentalBAL {
bal := CreateBAL(applyResult.BlockNum, applyResult.TxIO, pe.cfg.dirs.DataDir)
log.Debug("bal", "blockNum", applyResult.BlockNum, "hash", bal.Hash(), "valid", bal.Validate() == nil)

if pe.cfg.chainConfig.IsAmsterdam(applyResult.BlockTime) {
headerBALHash := *lastHeader.BlockAccessListHash
if headerBALHash != b.BlockAccessList().Hash() {
log.Info(fmt.Sprintf("bal from block: %s", b.BlockAccessList().DebugString()))
return fmt.Errorf("block %d: invalid block access list, hash mismatch: got %s expected %s", applyResult.BlockNum, b.BlockAccessList().Hash(), headerBALHash)
if lastHeader.BlockAccessListHash == nil {
if pe.isBlockProduction {
hash := bal.Hash()
lastHeader.BlockAccessListHash = &hash
} else {
return fmt.Errorf("block %d: missing block access list hash", applyResult.BlockNum)
}
}
if headerBALHash != bal.Hash() {
log.Info(fmt.Sprintf("computed bal: %s", bal.DebugString()))
return fmt.Errorf("%w, block=%d: block access list mismatch: got %s expected %s", rules.ErrInvalidBlock, applyResult.BlockNum, bal.Hash(), headerBALHash)
headerBALHash := *lastHeader.BlockAccessListHash
if !pe.isBlockProduction {
if headerBALHash != b.BlockAccessList().Hash() {
log.Info(fmt.Sprintf("bal from block: %s", b.BlockAccessList().DebugString()))
return fmt.Errorf("block %d: invalid block access list, hash mismatch: got %s expected %s", applyResult.BlockNum, b.BlockAccessList().Hash(), headerBALHash)
}
if headerBALHash != bal.Hash() {
log.Info(fmt.Sprintf("computed bal: %s", bal.DebugString()))
return fmt.Errorf("%w, block=%d: block access list mismatch: got %s expected %s", rules.ErrInvalidBlock, applyResult.BlockNum, bal.Hash(), headerBALHash)
}
}
}
}
Expand Down Expand Up @@ -1715,3 +1724,38 @@ func mergeReadSets(a state.ReadSet, b state.ReadSet) state.ReadSet {
}
return out
}

func mergeVersionedWrites(prev, next state.VersionedWrites) state.VersionedWrites {
if len(prev) == 0 {
return next
}
if len(next) == 0 {
return prev
}
merged := state.WriteSet{}
for _, v := range prev {
merged.Set(*v)
}
for _, v := range next {
merged.Set(*v)
}
out := make(state.VersionedWrites, 0, merged.Len())
merged.Scan(func(v *state.VersionedWrite) bool {
out = append(out, v)
return true
})
return out
}

func mergeAccessedAddresses(dst, src map[accounts.Address]struct{}) map[accounts.Address]struct{} {
if len(src) == 0 {
return dst
}
if dst == nil {
dst = make(map[accounts.Address]struct{}, len(src))
}
for addr := range src {
dst[addr] = struct{}{}
}
return dst
}
1 change: 1 addition & 0 deletions execution/stagedsync/stage_mining_create_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type MiningBlock struct {
Receipts types.Receipts
Withdrawals []*types.Withdrawal
Requests types.FlatRequests
BlockAccessList types.BlockAccessList

headerRlpSize *int
withdrawalsRlpSize *int
Expand Down
150 changes: 123 additions & 27 deletions execution/stagedsync/stage_mining_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/erigontech/erigon/common/metrics"
"github.com/erigontech/erigon/db/kv"
"github.com/erigontech/erigon/db/kv/membatchwithdb"
"github.com/erigontech/erigon/db/kv/temporal"
"github.com/erigontech/erigon/db/rawdb"
"github.com/erigontech/erigon/db/services"
"github.com/erigontech/erigon/db/state/execctx"
Expand Down Expand Up @@ -99,9 +100,18 @@ func SpawnMiningExecStage(ctx context.Context, s *StageState, sd *execctx.Shared
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
logPrefix := s.LogPrefix()
current := cfg.miningState.MiningBlock
needBAL := execCfg.chainConfig.IsAmsterdam(current.Header.Time) || execCfg.experimentalBAL

stateReader := state.NewReaderV3(sd.AsGetter(tx))
ibs := state.New(stateReader)
var balIO *state.VersionedIO
var systemReads state.ReadSet
var systemWrites state.VersionedWrites
var systemAccess map[accounts.Address]struct{}
if needBAL {
ibs.SetVersionMap(state.NewVersionMap(nil))
balIO = &state.VersionedIO{}
}
// Clique consensus needs forced author in the evm context
//if cfg.chainConfig.Consensus == chain.CliqueConsensus {
// execCfg.author = &cfg.miningState.MiningConfig.Etherbase
Expand All @@ -128,6 +138,12 @@ func SpawnMiningExecStage(ctx context.Context, s *StageState, sd *execctx.Shared
txNum := sd.TxNum()

protocol.InitializeBlockExecution(cfg.engine, chainReader, current.Header, cfg.chainConfig, ibs, &state.NoopWriter{}, logger, nil)
if needBAL {
systemReads = mergeReadSets(systemReads, ibs.VersionedReads())
systemWrites = mergeVersionedWrites(systemWrites, ibs.VersionedWrites(false))
systemAccess = mergeAccessedAddresses(systemAccess, ibs.AccessedAddresses())
ibs.ResetVersionedIO()
}

coinbase := accounts.InternAddress(cfg.miningState.MiningConfig.Etherbase)

Expand All @@ -149,7 +165,7 @@ func SpawnMiningExecStage(ctx context.Context, s *StageState, sd *execctx.Shared
}

if len(txns) > 0 {
logs, stop, err := addTransactionsToMiningBlock(ctx, logPrefix, current, cfg.chainConfig, cfg.vmConfig, getHeader, cfg.engine, txns, coinbase, ibs, interrupt, cfg.payloadId, logger)
logs, stop, err := addTransactionsToMiningBlock(ctx, logPrefix, current, cfg.chainConfig, cfg.vmConfig, getHeader, cfg.engine, txns, coinbase, ibs, balIO, interrupt, cfg.payloadId, logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,49 +205,106 @@ func SpawnMiningExecStage(ctx context.Context, s *StageState, sd *execctx.Shared
}

var block *types.Block
if needBAL {
ibs.ResetVersionedIO()
}
block, current.Requests, err = protocol.FinalizeBlockExecution(cfg.engine, stateReader, current.Header, current.Txns, current.Uncles, &state.NoopWriter{}, cfg.chainConfig, ibs, current.Receipts, current.Withdrawals, chainReader, true, logger, nil)
if err != nil {
return fmt.Errorf("cannot finalize block execution: %s", err)
}

// Simulate the block execution to get the final state root
if err = rawdb.WriteHeader(tx, block.Header()); err != nil {
return fmt.Errorf("cannot write header: %s", err)
}
blockHeight := block.NumberU64()

if err = rawdb.WriteCanonicalHash(tx, block.Hash(), blockHeight); err != nil {
return fmt.Errorf("cannot write canonical hash: %s", err)
}
if err = rawdb.WriteHeadHeaderHash(tx, block.Hash()); err != nil {
return err
}
if _, err = rawdb.WriteRawBodyIfNotExists(tx, block.Hash(), blockHeight, block.RawBody()); err != nil {
return fmt.Errorf("cannot write body: %s", err)
}
if err = rawdb.AppendCanonicalTxNums(tx, blockHeight); err != nil {
return err
if needBAL {
systemReads = mergeReadSets(systemReads, ibs.VersionedReads())
systemWrites = mergeVersionedWrites(systemWrites, ibs.VersionedWrites(false))
systemAccess = mergeAccessedAddresses(systemAccess, ibs.AccessedAddresses())
ibs.ResetVersionedIO()

systemVersion := state.Version{BlockNum: blockHeight, TxIndex: -1}
balIO.RecordReads(systemVersion, systemReads)
balIO.RecordWrites(systemVersion, systemWrites)
balIO.RecordAccesses(systemVersion, systemAccess)
current.BlockAccessList = CreateBAL(blockHeight, balIO, execCfg.dirs.DataDir)
}
if err = stages.SaveStageProgress(tx, kv.Headers, blockHeight); err != nil {
return err
}
if err = stages.SaveStageProgress(tx, stages.Bodies, blockHeight); err != nil {
return err
writeBlockForExecution := func(rwTx kv.TemporalRwTx) error {
if err = rawdb.WriteHeader(rwTx, block.Header()); err != nil {
return fmt.Errorf("cannot write header: %s", err)
}
if err = rawdb.WriteCanonicalHash(rwTx, block.Hash(), blockHeight); err != nil {
return fmt.Errorf("cannot write canonical hash: %s", err)
}
if err = rawdb.WriteHeadHeaderHash(rwTx, block.Hash()); err != nil {
return err
}
if _, err = rawdb.WriteRawBodyIfNotExists(rwTx, block.Hash(), blockHeight, block.RawBody()); err != nil {
return fmt.Errorf("cannot write body: %s", err)
}
if err = rawdb.AppendCanonicalTxNums(rwTx, blockHeight); err != nil {
return err
}
if err = stages.SaveStageProgress(rwTx, kv.Headers, blockHeight); err != nil {
return err
}
if err = stages.SaveStageProgress(rwTx, stages.Bodies, blockHeight); err != nil {
return err
}
senderS := &StageState{state: s.state, ID: stages.Senders, BlockNumber: blockHeight - 1}
if err = SpawnRecoverSendersStage(sendersCfg, senderS, nil, rwTx, blockHeight, ctx, logger); err != nil {
return err
}
return nil
}
senderS := &StageState{state: s.state, ID: stages.Senders, BlockNumber: blockHeight - 1}
if err = SpawnRecoverSendersStage(sendersCfg, senderS, nil, tx, blockHeight, ctx, logger); err != nil {

// Simulate the block execution to get the final state root
if err = writeBlockForExecution(tx); err != nil {
return err
}

// This flag will skip checking the state root
execS := &StageState{state: s.state, ID: stages.Execution, BlockNumber: blockHeight - 1}
forceParallel := dbg.Exec3Parallel || cfg.chainConfig.IsAmsterdam(current.Header.Time)
execTx := tx
execSd := sd
var execCleanup func()
if forceParallel {
// get the underlying TemporalTx from MemoryMutation and create temporary SharedDomain
if _, ok := tx.(*temporal.RwTx); !ok {
type txUnwrapper interface {
UnderlyingTx() kv.TemporalTx
}
if unwrap, ok := tx.(txUnwrapper); ok {
if rwTx, ok := unwrap.UnderlyingTx().(kv.TemporalRwTx); ok {
tempSd, err := execctx.NewSharedDomains(ctx, rwTx, logger)
if err != nil {
return err
}
execTx = rwTx
execSd = tempSd
execCleanup = func() {
tempSd.Close()
}
if err = writeBlockForExecution(execTx); err != nil {
execCleanup()
return err
}
}
}
}
if _, ok := execTx.(*temporal.RwTx); !ok {
return fmt.Errorf("parallel execution requires *temporal.RwTx, got %T", execTx)
}
}
if execCleanup != nil {
defer execCleanup()
}

if err = ExecV3(ctx, execS, u, execCfg, sd, tx, dbg.Exec3Parallel, blockHeight, logger); err != nil {
if err = ExecV3(ctx, execS, u, execCfg, execSd, execTx, forceParallel, blockHeight, logger); err != nil {
logger.Error("cannot execute block execution", "err", err)
return err
}

rh, err := sd.ComputeCommitment(ctx, tx, true, blockHeight, txNum, s.LogPrefix(), nil)
commitmentTxNum := execSd.TxNum()
rh, err := execSd.ComputeCommitment(ctx, execTx, true, blockHeight, commitmentTxNum, s.LogPrefix(), nil)
if err != nil {
return fmt.Errorf("compute commitment failed: %w", err)
}
Expand Down Expand Up @@ -426,12 +499,13 @@ func addTransactionsToMiningBlock(
txns types.Transactions,
coinbase accounts.Address,
ibs *state.IntraBlockState,
balIO *state.VersionedIO,
interrupt *atomic.Bool,
payloadId uint64,
logger log.Logger,
) (types.Logs, bool, error) {
header := current.Header
txnIdx := ibs.TxnIndex() + 1
txnIdx := ibs.TxnIndex()
gasPool := new(protocol.GasPool).AddGas(header.GasLimit - header.GasUsed)
if header.BlobGasUsed != nil {
gasPool.AddBlobGas(chainConfig.GetMaxBlobGasPerBlock(header.Time) - *header.BlobGasUsed)
Expand All @@ -440,6 +514,23 @@ func addTransactionsToMiningBlock(

var coalescedLogs types.Logs
noop := state.NewNoopWriter()
recordTxIO := func() {
if balIO == nil {
return
}
version := ibs.Version()
balIO.RecordReads(version, ibs.VersionedReads())
balIO.RecordWrites(version, ibs.VersionedWrites(false))
balIO.RecordAccesses(version, ibs.AccessedAddresses())
ibs.ResetVersionedIO()
}
clearTxIO := func() {
if balIO == nil {
return
}
ibs.AccessedAddresses()
ibs.ResetVersionedIO()
}

var miningCommitTx = func(txn types.Transaction, coinbase accounts.Address, vmConfig *vm.Config, chainConfig *chain.Config, ibs *state.IntraBlockState, current *MiningBlock) ([]*types.Log, error) {
ibs.SetTxContext(current.Header.Number.Uint64(), txnIdx)
Expand Down Expand Up @@ -552,6 +643,11 @@ LOOP:

// Start executing the transaction
logs, err := miningCommitTx(txn, coinbase, vmConfig, chainConfig, ibs, current)
if err == nil {
recordTxIO()
} else {
clearTxIO()
}
if errors.Is(err, protocol.ErrGasLimitReached) {
// Skip the env out-of-gas transaction
logger.Debug(fmt.Sprintf("[%s] Gas limit exceeded for env block", logPrefix), "hash", txn.Hash(), "sender", from)
Expand Down
8 changes: 8 additions & 0 deletions execution/stagedsync/stage_mining_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package stagedsync
import (
"fmt"

"github.com/erigontech/erigon/common/empty"
"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/db/kv"
"github.com/erigontech/erigon/db/services"
Expand Down Expand Up @@ -69,6 +70,13 @@ func SpawnMiningFinishStage(s *StageState, sd *execctx.SharedDomains, tx kv.Temp
//}

block := types.NewBlockForAsembling(current.Header, current.Txns, current.Uncles, current.Receipts, current.Withdrawals)
if current.BlockAccessList != nil {
block.SetBlockAccessList(current.BlockAccessList)
if block.BlockAccessListHash() == nil {
hash := empty.BlockAccessListHash
block.HeaderNoCopy().BlockAccessListHash = &hash
}
}
blockWithReceipts := &types.BlockWithReceipts{Block: block, Receipts: current.Receipts, Requests: current.Requests}
*current = MiningBlock{} // hack to clean global data

Expand Down
6 changes: 6 additions & 0 deletions execution/state/intra_block_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -1959,6 +1959,12 @@ func (sdb *IntraBlockState) VersionedReads() ReadSet {
return sdb.versionedReads
}

func (sdb *IntraBlockState) ResetVersionedIO() {
sdb.versionedReads = nil
sdb.versionedWrites = nil
sdb.dep = UnknownDep
}

// VersionedWrites returns the current versioned write set if this block
// checkDirty - is mainly for testing, for block processing this is called
// after the block execution is completed and non dirty writes (due to reversions)
Expand Down
Loading
Loading