diff --git a/core/blockchain.go b/core/blockchain.go index d0e93c3eea..4afa7fc91a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -86,11 +86,6 @@ var ( storageCacheHitMeter = metrics.NewRegisteredMeter("chain/storage/reads/cache/process/hit", nil) storageCacheMissMeter = metrics.NewRegisteredMeter("chain/storage/reads/cache/process/miss", nil) - accountCacheHitPrefetchMeter = metrics.NewRegisteredMeter("chain/account/reads/cache/prefetch/hit", nil) - accountCacheMissPrefetchMeter = metrics.NewRegisteredMeter("chain/account/reads/cache/prefetch/miss", nil) - storageCacheHitPrefetchMeter = metrics.NewRegisteredMeter("chain/storage/reads/cache/prefetch/hit", nil) - storageCacheMissPrefetchMeter = metrics.NewRegisteredMeter("chain/storage/reads/cache/prefetch/miss", nil) - accountReadSingleTimer = metrics.NewRegisteredResettingTimer("chain/account/single/reads", nil) //nolint:revive,unused storageReadSingleTimer = metrics.NewRegisteredResettingTimer("chain/storage/single/reads", nil) //nolint:revive,unused snapshotCommitTimer = metrics.NewRegisteredResettingTimer("chain/snapshot/commits", nil) @@ -127,6 +122,9 @@ var ( blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil) blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil) + // Warm-up timing (seedReaderCache) + triewarmTimer = metrics.NewRegisteredTimer("chain/warm", nil) + errInsertionInterrupted = errors.New("insertion is interrupted") errChainStopped = errors.New("blockchain is stopped") errInvalidOldChain = errors.New("invalid old chain") @@ -204,9 +202,11 @@ type BlockChainConfig struct { ChainHistoryMode history.HistoryMode // Misc options - NoPrefetch bool // Whether to disable heuristic state prefetching when processing blocks - Overrides *ChainOverrides // Optional chain config overrides - VmConfig vm.Config // Config options for the EVM Interpreter + NoPrefetch bool // Whether to disable heuristic state prefetching when processing blocks + WaitForWarm bool // If true, block execution waits for the warm-up to finish + WarmInWorker bool // If true, warming is done in miner worker; import-time warming is disabled + Overrides *ChainOverrides // Optional chain config overrides + VmConfig vm.Config // Config options for the EVM Interpreter // TxLookupLimit specifies the maximum number of blocks from head for which // transaction hashes will be indexed. @@ -240,6 +240,7 @@ func DefaultConfig() *BlockChainConfig { // This is appropriate for most unit tests. TxLookupLimit: -1, VmConfig: vm.Config{}, + WarmInWorker: false, } } @@ -391,6 +392,13 @@ type BlockChain struct { chain2HeadFeed event.Feed // Reorg/NewHead/Fork data feed chainSideFeed event.Feed // Side chain data feed (removed from geth but needed in bor) checker ethereum.ChainValidator + + // Last reader cache stats from ProcessBlock (read by insertChain for logging) + lastProcAccHit int64 + lastProcAccMiss int64 + lastProcStorHit int64 + lastProcStorMiss int64 + lastWarmSeedDur time.Duration } // NewBlockChain returns a fully initialised block chain using information @@ -663,7 +671,7 @@ func NewParallelBlockChain(db ethdb.Database, genesis *Genesis, engine consensus return bc, nil } -func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) { +func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool, warmCh <-chan warmOutcome) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) { // Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -689,14 +697,32 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, wit } parentRoot := parent.Root - prefetch, process, err := bc.statedb.ReadersWithCacheStats(parentRoot) + _, process, err := bc.statedb.ReadersWithCacheStats(parentRoot) if err != nil { return nil, nil, 0, nil, 0, err } - throwaway, err := state.NewWithReader(parentRoot, bc.statedb, prefetch) - if err != nil { - return nil, nil, 0, nil, 0, err + // If an async warm was started earlier for this root, wait for it to finish. + var warmedProc state.ReaderWithStats + if warmCh != nil { + if res, ok := <-warmCh; ok { + warmedProc = res.Reader + bc.lastWarmSeedDur = res.Dur + if res.Dur > 0 { + triewarmTimer.Update(res.Dur) + } + } + } + // Prefer an asynchronously warmed reader if available. + if warmedProc != nil { + process = warmedProc + } else { + bc.lastWarmSeedDur = 0 } + // Create a prefetch statedb for background prefetch + // throwaway, err := state.NewWithReader(parentRoot, bc.statedb, prefetch) + // if err != nil { + // return nil, nil, 0, nil, 0, err + // } statedb, err := state.NewWithReader(parentRoot, bc.statedb, process) if err != nil { return nil, nil, 0, nil, 0, err @@ -708,29 +734,28 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, wit // Upload the statistics of reader at the end defer func() { - stats := prefetch.GetStats() - accountCacheHitPrefetchMeter.Mark(stats.AccountHit) - accountCacheMissPrefetchMeter.Mark(stats.AccountMiss) - storageCacheHitPrefetchMeter.Mark(stats.StorageHit) - storageCacheMissPrefetchMeter.Mark(stats.StorageMiss) - stats = process.GetStats() + stats := process.GetStats() accountCacheHitMeter.Mark(stats.AccountHit) accountCacheMissMeter.Mark(stats.AccountMiss) storageCacheHitMeter.Mark(stats.StorageHit) storageCacheMissMeter.Mark(stats.StorageMiss) + bc.lastProcAccHit += stats.AccountHit + bc.lastProcAccMiss += stats.AccountMiss + bc.lastProcStorHit += stats.StorageHit + bc.lastProcStorMiss += stats.StorageMiss }() - go func(start time.Time, throwaway *state.StateDB, block *types.Block) { - // Disable tracing for prefetcher executions. - vmCfg := bc.cfg.VmConfig - vmCfg.Tracer = nil - bc.prefetcher.Prefetch(block, throwaway, vmCfg, followupInterrupt) + // go func(start time.Time, throwaway *state.StateDB, block *types.Block) { + // // Disable tracing for prefetcher executions. + // vmCfg := bc.cfg.VmConfig + // vmCfg.Tracer = nil + // bc.prefetcher.Prefetch(block, throwaway, vmCfg, followupInterrupt) - blockPrefetchExecuteTimer.Update(time.Since(start)) - if followupInterrupt.Load() { - blockPrefetchInterruptMeter.Mark(1) - } - }(time.Now(), throwaway, block) + // blockPrefetchExecuteTimer.Update(time.Since(start)) + // if followupInterrupt.Load() { + // blockPrefetchInterruptMeter.Mark(1) + // } + // }(time.Now(), throwaway, block) type Result struct { receipts types.Receipts @@ -822,6 +847,23 @@ func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, wit return result.receipts, result.logs, result.usedGas, result.statedb, vtime, result.err } +// warmReaderCache preloads the supplied readerWithCache with accounts/storage and trie nodes. +func (bc *BlockChain) warmReaderCache(parentRoot common.Hash, reader state.ReaderWithStats, b *types.Block, followupInterrupt *atomic.Bool) (state.ReaderWithStats, time.Duration) { + if reader == nil || b == nil || len(b.Transactions()) == 0 { + return nil, 0 + } + vmCfg := bc.cfg.VmConfig + vmCfg.Tracer = nil + warmdb, err := state.NewWithReader(parentRoot, bc.statedb, reader) + if err != nil { + return nil, 0 + } + + start := time.Now() + bc.prefetcher.Prefetch(b, warmdb, vmCfg, followupInterrupt) + return reader, time.Since(start) +} + func (bc *BlockChain) setupSnapshot() { // Short circuit if the chain is established with path scheme, as the // state snapshot has been integrated into path database natively. @@ -2639,7 +2681,7 @@ func (bc *BlockChain) insertChainStatelessParallel(chain types.Blocks, witnesses snapDiffItems, snapBufItems = bc.snaps.Size() } trieDiffNodes, trieBufNodes, _ := bc.triedb.Size() - stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true) + stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true, bc) continue } @@ -2716,7 +2758,7 @@ func (bc *BlockChain) insertChainStatelessParallel(chain types.Blocks, witnesses snapDiffItems, snapBufItems = bc.snaps.Size() } trieDiffNodes, trieBufNodes, _ := bc.triedb.Size() - stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true) + stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true, bc) } return int(processed.Load()), nil @@ -2820,7 +2862,7 @@ func (bc *BlockChain) insertChainStatelessSequential(chain types.Blocks, witness snapDiffItems, snapBufItems = bc.snaps.Size() } trieDiffNodes, trieBufNodes, _ := bc.triedb.Size() - stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true) + stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true, bc) } // End-of-batch witness validation for i, block := range chain { @@ -3111,6 +3153,10 @@ func (bc *BlockChain) insertChainWithWitnesses(chain types.Blocks, setHead bool, if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } + + // Start reader cache warming + warmCh := bc.StartWarmReaderCache(parent.Root, block) + statedb, err := state.New(parent.Root, bc.statedb) if err != nil { return nil, it.index, err @@ -3170,7 +3216,11 @@ func (bc *BlockChain) insertChainWithWitnesses(chain types.Blocks, setHead bool, } } - receipts, logs, usedGas, statedb, vtime, err := bc.ProcessBlock(block, parent, witness, &followupInterrupt) + var execWarmCh <-chan warmOutcome + if bc.cfg.WaitForWarm { + execWarmCh = warmCh + } + receipts, logs, usedGas, statedb, vtime, err := bc.ProcessBlock(block, parent, witness, &followupInterrupt, execWarmCh) bc.statedb.TrieDB().SetReadBackend(nil) bc.statedb.EnableSnapInReader() activeState = statedb @@ -3208,6 +3258,15 @@ func (bc *BlockChain) insertChainWithWitnesses(chain types.Blocks, setHead bool, blockExecutionTimer.Update(ptime - trieRead) // The time spent on EVM processing blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation borConsensusTime.Update(statedb.BorConsensusTime) // The time spent on bor consensus (span + state sync) + + execOnly := (ptime - trieRead) + stateCalc := (triehash + trieUpdate + trieRead) + stats.execDur += execOnly + stats.stateCalcDur += stateCalc + if bc.lastWarmSeedDur > 0 { + stats.warmDur += bc.lastWarmSeedDur + } + stats.valDur += (vtime - (triehash + trieUpdate)) // Write the block to the chain and get the status. var ( wstart = time.Now() @@ -3260,7 +3319,7 @@ func (bc *BlockChain) insertChainWithWitnesses(chain types.Blocks, setHead bool, snapDiffItems, snapBufItems = bc.snaps.Size() } trieDiffNodes, trieBufNodes, _ := bc.triedb.Size() - stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead, false) + stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead, false, bc) /* // Print confirmation that a future fork is scheduled, but not yet active. @@ -4297,3 +4356,51 @@ func (bc *BlockChain) verifyPendingHeaders() { } } } + +type warmOutcome struct { + Reader state.ReaderWithStats + Dur time.Duration +} + +// StartWarmReaderCache launches a goroutine to warm up a state reader's cache +func (bc *BlockChain) StartWarmReaderCache(parentRoot common.Hash, b *types.Block) <-chan warmOutcome { + ch := make(chan warmOutcome, 1) + if b == nil || len(b.Transactions()) == 0 || bc.prefetcher == nil || (bc.cfg != nil && bc.cfg.WarmInWorker) { + close(ch) + return ch + } + go func() { + // Build process reader and warm it + _, process, err := bc.statedb.ReadersWithCacheStats(parentRoot) + if err == nil && process != nil { + if warmed, dur := bc.warmReaderCache(parentRoot, process, b, nil); warmed != nil { + ch <- warmOutcome{Reader: warmed, Dur: dur} + close(ch) + return + } + } + close(ch) + }() + return ch +} + +// WaitForWarmEnabled reports whether execution should wait for warm-up to finish. +func (bc *BlockChain) WaitForWarmEnabled() bool { + if bc.cfg == nil { + return false + } + return bc.cfg.WaitForWarm +} + +// WarmInWorkerEnabled reports whether warming should be done in miner worker. +func (bc *BlockChain) WarmInWorkerEnabled() bool { + if bc.cfg == nil { + return false + } + return bc.cfg.WarmInWorker +} + +// NewStateWithReader creates a new StateDB bound to the given reader for the root. +func (bc *BlockChain) NewStateWithReader(root common.Hash, rdr state.ReaderWithStats) (*state.StateDB, error) { + return state.NewWithReader(root, bc.statedb, rdr) +} diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index e9763a457a..b483cd676b 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -30,6 +30,22 @@ type insertStats struct { queued, processed, ignored int usedGas uint64 startTime mclock.AbsTime + execDur time.Duration + stateCalcDur time.Duration + valDur time.Duration + warmDur time.Duration + + // Cache stats accumulated across this segment + procAccHit int64 + procAccMiss int64 + procStorHit int64 + procStorMiss int64 +} + +// getBlockchain attempts to retrieve the BlockChain instance for reading last* stats. +// This is a shim; in this codebase we can't access bc directly here without plumbing. +func (st *insertStats) getBlockchain() (*BlockChain, bool) { + return nil, false } // statsReportLimit is the time limit during import and export after which we @@ -38,7 +54,7 @@ const statsReportLimit = 8 * time.Second // report prints statistics if some number of blocks have been processed // or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes common.StorageSize, setHead bool, stateless bool) { +func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes common.StorageSize, setHead bool, stateless bool, bc *BlockChain) { // Fetch the timings for the batch var ( now = mclock.Now() @@ -61,6 +77,18 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, "elapsed", common.PrettyDuration(elapsed), "mgasps", mgasps, } + if st.execDur != 0 { + context = append(context, []interface{}{"exec", common.PrettyDuration(st.execDur)}...) + } + if st.valDur != 0 { + context = append(context, []interface{}{"validation", common.PrettyDuration(st.valDur)}...) + } + if st.stateCalcDur != 0 { + context = append(context, []interface{}{"statecalc", common.PrettyDuration(st.stateCalcDur)}...) + } + if st.warmDur != 0 { + context = append(context, []interface{}{"warm", common.PrettyDuration(st.warmDur)}...) + } if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute { context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) } @@ -70,6 +98,35 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn context = append(context, []interface{}{"snapdirty", snapBufItems}...) } } + // Append cache hit/miss percentages + if bc != nil { + if total := bc.lastProcAccHit + bc.lastProcAccMiss; total > 0 { + pct := int64(100 * bc.lastProcAccHit / total) + context = append(context, []interface{}{"cacheproc_acc_pct", pct}...) + } + if total := bc.lastProcStorHit + bc.lastProcStorMiss; total > 0 { + pct := int64(100 * bc.lastProcStorHit / total) + context = append(context, []interface{}{"cacheproc_stor_pct", pct}...) + } + // reset for next segment + bc.lastProcAccHit, bc.lastProcAccMiss = 0, 0 + bc.lastProcStorHit, bc.lastProcStorMiss = 0, 0 + } + // Append cache hit/miss percentages + if bc, ok := st.getBlockchain(); ok { + st.procAccHit, st.procAccMiss = bc.lastProcAccHit, bc.lastProcAccMiss + st.procStorHit, st.procStorMiss = bc.lastProcStorHit, bc.lastProcStorMiss + bc.lastProcAccHit, bc.lastProcAccMiss = 0, 0 + bc.lastProcStorHit, bc.lastProcStorMiss = 0, 0 + } + if total := st.procAccHit + st.procAccMiss; total > 0 { + pct := int64(100 * st.procAccHit / total) + context = append(context, []interface{}{"cacheproc_acc_pct", pct}...) + } + if total := st.procStorHit + st.procStorMiss; total > 0 { + pct := int64(100 * st.procStorHit / total) + context = append(context, []interface{}{"cacheproc_stor_pct", pct}...) + } if trieDiffNodes != 0 { // pathdb context = append(context, []interface{}{"triediffs", trieDiffNodes}...) } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 0856a91368..2ff24503e8 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -181,7 +181,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { if err != nil { return err } - receipts, logs, usedGas, statedb, _, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header(), nil, nil) + receipts, logs, usedGas, statedb, _, err := blockchain.ProcessBlock(block, blockchain.GetBlockByHash(block.ParentHash()).Header(), nil, nil, nil) res := &ProcessResult{ Receipts: receipts, Logs: logs, diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index ad0fb4a04e..2d87a9ef01 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -95,6 +95,66 @@ type ExecutionTask struct { blockContext vm.BlockContext } +// NewWarmExecTasks creates a list of ExecutionTask objects for warming up the state reader cache. +func NewWarmExecTasks(bc *BlockChain, chainCfg *params.ChainConfig, header *types.Header, baseState *state.StateDB, coinbase common.Address, txs []*types.Transaction, evmCfg vm.Config) ([]blockstm.ExecTask, error) { + tasks := make([]blockstm.ExecTask, 0, len(txs)) + if len(txs) == 0 { + return tasks, nil + } + + finalState := baseState.Copy() + var ( + receipts types.Receipts + allLogs []*types.Log + usedGas = new(uint64) + ) + // Create block and EVM block context + blockNumber := header.Number + blockHash := header.Hash() + blockTime := header.Time + blockCtx := NewEVMBlockContext(header, bc, &coinbase) + shouldDelayFeeCal := false + + for i, tx := range txs { + if tx == nil { + continue + } + if tx.Type() == types.StateSyncTxType { + continue + } + msg, err := TransactionToMessage(tx, types.MakeSigner(chainCfg, header.Number, header.Time), header.BaseFee) + if err != nil { + continue + } + cleanState := baseState.Copy() + task := &ExecutionTask{ + msg: *msg, + config: chainCfg, + gasLimit: header.GasLimit, + blockNumber: blockNumber, + blockHash: blockHash, + blockTime: blockTime, + tx: tx, + index: i, + cleanStateDB: cleanState, + finalStateDB: finalState, + blockChain: bc, + header: header, + evmConfig: evmCfg, + shouldDelayFeeCal: &shouldDelayFeeCal, + sender: msg.From, + totalUsedGas: usedGas, + receipts: &receipts, + allLogs: &allLogs, + dependencies: nil, + coinbase: coinbase, + blockContext: blockCtx, + } + tasks = append(tasks, task) + } + return tasks, nil +} + func (task *ExecutionTask) Execute(mvh *blockstm.MVHashMap, incarnation int) (err error) { task.statedb = task.cleanStateDB.Copy() task.statedb.SetTxContext(task.tx.Hash(), task.index) diff --git a/core/state/reader.go b/core/state/reader.go index 6790a44c1e..e776ef486c 100644 --- a/core/state/reader.go +++ b/core/state/reader.go @@ -350,9 +350,7 @@ func newMultiStateReader(readers ...StateReader) (*multiStateReader, error) { if len(readers) == 0 { return nil, errors.New("empty reader set") } - return &multiStateReader{ - readers: readers, - }, nil + return &multiStateReader{readers: readers}, nil } // Account implementing StateReader interface, retrieving the account associated @@ -515,7 +513,7 @@ func (r *readerWithCache) Storage(addr common.Address, slot common.Hash) (common return value, err } -type readerWithCacheStats struct { +type ReaderWithCacheStats struct { *readerWithCache accountHit atomic.Int64 accountMiss atomic.Int64 @@ -524,8 +522,8 @@ type readerWithCacheStats struct { } // newReaderWithCacheStats constructs the reader with additional statistics tracked. -func newReaderWithCacheStats(reader *readerWithCache) *readerWithCacheStats { - return &readerWithCacheStats{ +func newReaderWithCacheStats(reader *readerWithCache) *ReaderWithCacheStats { + return &ReaderWithCacheStats{ readerWithCache: reader, } } @@ -534,7 +532,7 @@ func newReaderWithCacheStats(reader *readerWithCache) *readerWithCacheStats { // The returned account might be nil if it's not existent. // // An error will be returned if the state is corrupted in the underlying reader. -func (r *readerWithCacheStats) Account(addr common.Address) (*types.StateAccount, error) { +func (r *ReaderWithCacheStats) Account(addr common.Address) (*types.StateAccount, error) { account, incache, err := r.readerWithCache.account(addr) if err != nil { return nil, err @@ -552,7 +550,7 @@ func (r *readerWithCacheStats) Account(addr common.Address) (*types.StateAccount // existent. // // An error will be returned if the state is corrupted in the underlying reader. -func (r *readerWithCacheStats) Storage(addr common.Address, slot common.Hash) (common.Hash, error) { +func (r *ReaderWithCacheStats) Storage(addr common.Address, slot common.Hash) (common.Hash, error) { value, incache, err := r.readerWithCache.storage(addr, slot) if err != nil { return common.Hash{}, err @@ -566,7 +564,7 @@ func (r *readerWithCacheStats) Storage(addr common.Address, slot common.Hash) (c } // GetStats implements ReaderWithStats, returning the statistics of state reader. -func (r *readerWithCacheStats) GetStats() ReaderStats { +func (r *ReaderWithCacheStats) GetStats() ReaderStats { return ReaderStats{ AccountHit: r.accountHit.Load(), AccountMiss: r.accountMiss.Load(), @@ -574,3 +572,48 @@ func (r *readerWithCacheStats) GetStats() ReaderStats { StorageMiss: r.storageMiss.Load(), } } + +// SeedCaches inserts the supplied accounts and storage slots directly into the +// local caches. +func (r *ReaderWithCacheStats) SeedCaches(accounts map[common.Address]*types.StateAccount, storage map[common.Address]map[common.Hash]common.Hash) { + if r == nil || r.readerWithCache == nil { + return + } + + if len(accounts) > 0 { + r.accountLock.Lock() + for addr, acct := range accounts { + // Only add if not already cached + if _, exists := r.accounts[addr]; exists { + continue + } + if acct != nil { + r.accounts[addr] = acct + } + } + r.accountLock.Unlock() + } + + if len(storage) > 0 { + for addr, slots := range storage { + if len(slots) == 0 { + continue + } + bucket := &r.storageBuckets[addr[0]&0x0f] + bucket.lock.Lock() + dst, ok := bucket.storages[addr] + if !ok { + dst = make(map[common.Hash]common.Hash, len(slots)) + bucket.storages[addr] = dst + } + for k, v := range slots { + // Only add if not already cached + if _, exists := dst[k]; exists { + continue + } + dst[k] = v + } + bucket.lock.Unlock() + } + } +} diff --git a/core/state/statedb.go b/core/state/statedb.go index 5f5c3c5c5f..033bb819a0 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -173,6 +173,38 @@ type StateDB struct { BorConsensusTime time.Duration } +// OriginalRoot returns the pre-state root the StateDB was constructed with. +func (s *StateDB) OriginalRoot() common.Hash { + return s.originalRoot +} + +// PrefetchTouched returns pre-state accounts and storage values that were +// touched in this StateDB during execution. +func (s *StateDB) PrefetchTouched() (map[common.Address]*types.StateAccount, map[common.Address]map[common.Hash]common.Hash) { + accounts := make(map[common.Address]*types.StateAccount) + storage := make(map[common.Address]map[common.Hash]common.Hash) + + for addr, obj := range s.stateObjects { + if obj == nil { + continue + } + if obj.origin != nil { + accounts[addr] = obj.origin + } + if len(obj.originStorage) > 0 { + dst := storage[addr] + if dst == nil { + dst = make(map[common.Hash]common.Hash, len(obj.originStorage)) + storage[addr] = dst + } + for k, v := range obj.originStorage { + dst[k] = v + } + } + } + return accounts, storage +} + // New creates a new state from a given trie. func New(root common.Hash, db Database) (*StateDB, error) { reader, err := db.Reader(root) diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index cdbb44b130..531b8d73ee 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -50,12 +50,17 @@ func newStatePrefetcher(config *params.ChainConfig, chain *HeaderChain) *statePr // the transaction messages using the statedb, but any changes are discarded. The // only goal is to warm the state caches. func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) { + type prefetchResult struct { + accounts map[common.Address]*types.StateAccount + storage map[common.Address]map[common.Hash]common.Hash + } var ( fails atomic.Int64 header = block.Header() signer = types.MakeSigner(p.config, header.Number, header.Time) workers errgroup.Group reader = statedb.Reader() + results = make([]prefetchResult, len(block.Transactions())) ) workers.SetLimit(max(1, 4*runtime.NumCPU()/5)) // Aggressively run the prefetching @@ -111,18 +116,79 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c fails.Add(1) return nil // Ugh, something went horribly wrong, bail out } - // Pre-load trie nodes for the intermediate root. - // - // This operation incurs significant memory allocations due to - // trie hashing and node decoding. TODO(rjl493456442): investigate - // ways to mitigate this overhead. + acc, stor := stateCpy.PrefetchTouched() + results[i] = prefetchResult{accounts: acc, storage: stor} stateCpy.IntermediateRoot(true) return nil }) } workers.Wait() + accList := make([]map[common.Address]*types.StateAccount, len(results)) + storList := make([]map[common.Address]map[common.Hash]common.Hash, len(results)) + for i, res := range results { + accList[i] = res.accounts + storList[i] = res.storage + } + accAgg, storAgg := aggregatePrefetch(accList, storList) + seedPrefetchCaches(reader, accAgg, storAgg) + blockPrefetchTxsValidMeter.Mark(int64(len(block.Transactions())) - fails.Load()) blockPrefetchTxsInvalidMeter.Mark(fails.Load()) - return +} + +// aggregatePrefetch combines multiple prefetch results into a single aggregated +// set of accounts and storage slots. +func aggregatePrefetch( + accList []map[common.Address]*types.StateAccount, + storList []map[common.Address]map[common.Hash]common.Hash, +) (map[common.Address]*types.StateAccount, map[common.Address]map[common.Hash]common.Hash) { + accAgg := make(map[common.Address]*types.StateAccount) + storAgg := make(map[common.Address]map[common.Hash]common.Hash) + + for _, acc := range accList { + for a, acct := range acc { + if _, exists := accAgg[a]; !exists { + accAgg[a] = acct + } + } + } + for _, stor := range storList { + for a, bucket := range stor { + dst := storAgg[a] + if dst == nil { + dst = make(map[common.Hash]common.Hash, len(bucket)) + storAgg[a] = dst + } + for k, v := range bucket { + if _, exists := dst[k]; !exists { + dst[k] = v + } + } + } + } + return accAgg, storAgg +} + +// seedPrefetchCaches warms the reader caches with aggregated accounts and storage. +func seedPrefetchCaches( + reader state.Reader, + accAgg map[common.Address]*types.StateAccount, + storAgg map[common.Address]map[common.Hash]common.Hash, +) { + if len(accAgg) == 0 && len(storAgg) == 0 { + return + } + if statReader, ok := reader.(*state.ReaderWithCacheStats); ok && statReader != nil { + statReader.SeedCaches(accAgg, storAgg) + return + } + for a := range accAgg { + reader.Account(a) + } + for a, bucket := range storAgg { + for k := range bucket { + reader.Storage(a, k) + } + } } diff --git a/eth/backend.go b/eth/backend.go index 73a548cba8..97c52d146f 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -258,6 +258,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { options := &core.BlockChainConfig{ TrieCleanLimit: config.TrieCleanCache, NoPrefetch: config.NoPrefetch, + WaitForWarm: config.WaitForWarm, + WarmInWorker: config.WarmInWorker, TrieDirtyLimit: config.TrieDirtyCache, ArchiveMode: config.NoPruning, TrieTimeLimit: config.TrieTimeout, diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 6ccfee2a2e..68df7b2214 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -103,8 +103,11 @@ type Config struct { SnapDiscoveryURLs []string // State options. - NoPruning bool // Whether to disable pruning and flush everything to disk - NoPrefetch bool // Whether to disable prefetching and only load state on demand + NoPruning bool // Whether to disable pruning and flush everything to disk + NoPrefetch bool // Whether to disable prefetching and only load state on demand + WaitForWarm bool // If true, block execution waits for warm-up to finish + // If true, warm-up runs in miner worker (not during import). + WarmInWorker bool // Deprecated: use 'TransactionHistory' instead. TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index 5d14a338fe..cc03953965 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -27,6 +27,8 @@ func (c Config) MarshalTOML() (interface{}, error) { SnapDiscoveryURLs []string NoPruning bool NoPrefetch bool + WaitForWarm bool + WarmInWorker bool TxLookupLimit uint64 `toml:",omitempty"` TransactionHistory uint64 `toml:",omitempty"` LogHistory uint64 `toml:",omitempty"` @@ -94,6 +96,8 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.SnapDiscoveryURLs = c.SnapDiscoveryURLs enc.NoPruning = c.NoPruning enc.NoPrefetch = c.NoPrefetch + enc.WaitForWarm = c.WaitForWarm + enc.WarmInWorker = c.WarmInWorker enc.TxLookupLimit = c.TxLookupLimit enc.TransactionHistory = c.TransactionHistory enc.LogHistory = c.LogHistory @@ -161,6 +165,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { SnapDiscoveryURLs []string NoPruning *bool NoPrefetch *bool + WaitForWarm *bool + WarmInWorker *bool TxLookupLimit *uint64 `toml:",omitempty"` TransactionHistory *uint64 `toml:",omitempty"` LogHistory *uint64 `toml:",omitempty"` @@ -247,6 +253,12 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.NoPrefetch != nil { c.NoPrefetch = *dec.NoPrefetch } + if dec.WaitForWarm != nil { + c.WaitForWarm = *dec.WaitForWarm + } + if dec.WarmInWorker != nil { + c.WarmInWorker = *dec.WarmInWorker + } if dec.TxLookupLimit != nil { c.TxLookupLimit = *dec.TxLookupLimit } diff --git a/go.mod b/go.mod index 44f313f115..0e0887f671 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/ethereum/go-ethereum // Note: Change the go image version in Dockerfile if you change this. -go 1.24.6 +go 1.24.8 require ( github.com/0xPolygon/crand v1.0.3 @@ -21,7 +21,7 @@ require ( github.com/cloudflare/cloudflare-go v0.114.0 github.com/cockroachdb/pebble v1.1.5 github.com/cometbft/cometbft v0.38.17 - github.com/consensys/gnark-crypto v0.18.0 + github.com/consensys/gnark-crypto v0.18.1 github.com/cosmos/cosmos-sdk v0.50.14 github.com/cosmos/gogoproto v1.7.0 github.com/crate-crypto/go-eth-kzg v1.3.0 diff --git a/go.sum b/go.sum index 24b2bdab05..61a1f82043 100644 --- a/go.sum +++ b/go.sum @@ -289,8 +289,8 @@ github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1: github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/cometbft/cometbft-db v0.14.1 h1:SxoamPghqICBAIcGpleHbmoPqy+crij/++eZz3DlerQ= github.com/cometbft/cometbft-db v0.14.1/go.mod h1:KHP1YghilyGV/xjD5DP3+2hyigWx0WTp9X+0Gnx0RxQ= -github.com/consensys/gnark-crypto v0.18.0 h1:vIye/FqI50VeAr0B3dx+YjeIvmc3LWz4yEfbWBpTUf0= -github.com/consensys/gnark-crypto v0.18.0/go.mod h1:L3mXGFTe1ZN+RSJ+CLjUt9x7PNdx8ubaYfDROyp2Z8c= +github.com/consensys/gnark-crypto v0.18.1 h1:RyLV6UhPRoYYzaFnPQA4qK3DyuDgkTgskDdoGqFt3fI= +github.com/consensys/gnark-crypto v0.18.1/go.mod h1:L3mXGFTe1ZN+RSJ+CLjUt9x7PNdx8ubaYfDROyp2Z8c= github.com/containerd/continuity v0.4.5 h1:ZRoN1sXq9u7V6QoHMcVWGhOwDFqZ4B9i5H6un1Wh0x4= github.com/containerd/continuity v0.4.5/go.mod h1:/lNJvtJKUQStBzpVQ1+rasXO1LAWtUQssk28EZvJ3nE= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index cc4abd2876..8f1b2bfc41 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -602,6 +602,12 @@ type CacheConfig struct { // NoPrefetch is used to disable prefetch of tries NoPrefetch bool `hcl:"noprefetch,optional" toml:"noprefetch,optional"` + // WaitForWarm, if true, waits for warm-up to finish before executing blocks + WaitForWarm bool `hcl:"waitforwarm,optional" toml:"waitforwarm,optional"` + + // WarmInWorker, if true, runs warm-up in miner worker (not during import) + WarmInWorker bool `hcl:"warminworker,optional" toml:"warminworker,optional"` + // Preimages is used to enable the track of hash preimages Preimages bool `hcl:"preimages,optional" toml:"preimages,optional"` @@ -1288,6 +1294,8 @@ func (c *Config) buildEth(stack *node.Node, accountManager *accounts.Manager) (* n.TrieCleanCache = calcPerc(c.Cache.PercTrie) n.TrieDirtyCache = calcPerc(c.Cache.PercGc) n.NoPrefetch = c.Cache.NoPrefetch + n.WaitForWarm = c.Cache.WaitForWarm + n.WarmInWorker = c.Cache.WarmInWorker n.Preimages = c.Cache.Preimages // Note that even the values set by `history.transactions` will be written in the old flag until it's removed. n.TransactionHistory = c.Cache.TxLookupLimit diff --git a/internal/cli/server/flags.go b/internal/cli/server/flags.go index 3b1db12b43..50eeee518a 100644 --- a/internal/cli/server/flags.go +++ b/internal/cli/server/flags.go @@ -461,6 +461,20 @@ func (c *Command) Flags(config *Config) *flagset.Flagset { Default: c.cliConfig.Cache.NoPrefetch, Group: "Cache", }) + f.BoolFlag(&flagset.BoolFlag{ + Name: "cache.waitforwarm", + Usage: "Wait for prefetch warm-up to finish before executing blocks", + Value: &c.cliConfig.Cache.WaitForWarm, + Default: c.cliConfig.Cache.WaitForWarm, + Group: "Cache", + }) + f.BoolFlag(&flagset.BoolFlag{ + Name: "cache.warminworker", + Usage: "Run prefetch warm-up in miner worker instead of during block import", + Value: &c.cliConfig.Cache.WarmInWorker, + Default: c.cliConfig.Cache.WarmInWorker, + Group: "Cache", + }) f.BoolFlag(&flagset.BoolFlag{ Name: "cache.preimages", Usage: "Enable recording the SHA3/keccak preimages of trie keys", diff --git a/miner/worker.go b/miner/worker.go index c5168e96f3..b25061e3b7 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" "math/big" + "runtime" + "sort" "sync" "sync/atomic" "time" @@ -926,6 +928,48 @@ func (w *worker) makeEnv(parent *types.Header, header *types.Header, coinbase co return env, nil } +// selectHeavyAccounts returns a subset of accounts from the provided plain/blob maps, +// preferring accounts with higher head-tx gas first, until the cumulative head gas +// reaches the provided gas limit +func selectHeavyAccounts(plain, blob map[common.Address][]*txpool.LazyTransaction, gasLimit uint64) map[common.Address]struct{} { + type accountSet struct { + addr common.Address + gas uint64 + } + cands := make([]accountSet, 0, len(plain)+len(blob)) + for a, txs := range plain { + if len(txs) == 0 { + continue + } + cands = append(cands, accountSet{addr: a, gas: txs[0].Gas}) + } + for a, txs := range blob { + if len(txs) == 0 { + continue + } + cands = append(cands, accountSet{addr: a, gas: txs[0].Gas}) + } + sort.Slice(cands, func(i, j int) bool { return cands[i].gas > cands[j].gas }) + var sum uint64 + selected := make(map[common.Address]struct{}) + for _, c := range cands { + if _, ok := selected[c.addr]; ok { + continue + } + if sum >= gasLimit { + break + } + selected[c.addr] = struct{}{} + // Cap to block gas limit + if gasLimit-sum < c.gas { + sum = gasLimit + } else { + sum += c.gas + } + } + return selected +} + // updateSnapshot updates pending snapshot block, receipts and state. func (w *worker) updateSnapshot(env *environment) { w.snapshotMu.Lock() @@ -1451,22 +1495,74 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err } } - // Fill the block with all available pending transactions. - if len(prioPlainTxs) > 0 || len(prioBlobTxs) > 0 { - plainTxs := newTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee, &w.interruptBlockBuilding) - blobTxs := newTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee, &w.interruptBlockBuilding) + var plainTxsPrio, blobTxsPrio *transactionsByPriceAndNonce + var plainTxsNormal, blobTxsNormal *transactionsByPriceAndNonce + { + if len(prioPlainTxs) > 0 || len(prioBlobTxs) > 0 { + plainTxsPrio = newTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee, &w.interruptBlockBuilding) + blobTxsPrio = newTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee, &w.interruptBlockBuilding) + } + if len(normalPlainTxs) > 0 || len(normalBlobTxs) > 0 { + heapInitTime := time.Now() + plainTxsNormal = newTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee, &w.interruptBlockBuilding) + blobTxsNormal = newTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee, &w.interruptBlockBuilding) + txHeapInitTimer.Update(time.Since(heapInitTime)) + } + // Flatten pending txs into a slice (no capping) + txs := make([]*types.Transaction, 0) + appendMap := func(m map[common.Address][]*txpool.LazyTransaction) { + for _, list := range m { + for _, ltx := range list { + if ltx == nil || ltx.Tx == nil { + continue + } + txs = append(txs, ltx.Tx) + } + } + } + appendMap(prioPlainTxs) + appendMap(prioBlobTxs) + appendMap(normalPlainTxs) + appendMap(normalBlobTxs) + + // Filter txs for warming and start warm reader cache + if len(txs) > 0 && w.chain.WarmInWorkerEnabled() { + selectedPrio := selectHeavyAccounts(prioPlainTxs, prioBlobTxs, env.header.GasLimit) + selectedNormal := selectHeavyAccounts(normalPlainTxs, normalBlobTxs, env.header.GasLimit) + + // Filter txs slice to only include selected accounts + filteredTxs := make([]*types.Transaction, 0) + for _, tx := range txs { + from, _ := types.Sender(env.signer, tx) + if _, ok := selectedPrio[from]; ok { + filteredTxs = append(filteredTxs, tx) + } else if _, ok := selectedNormal[from]; ok { + filteredTxs = append(filteredTxs, tx) + } + } + txs = filteredTxs - if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt, new(uint256.Int)); err != nil { - return err + // start warm reader cache + tasks, _ := core.NewWarmExecTasks(w.chain, w.chainConfig, env.header, env.state, env.coinbase, txs, vm.Config{}) + numProcs := max(1, 4*runtime.NumCPU()/5) + if w.chain.WaitForWarmEnabled() { + _, _ = blockstm.ExecuteParallel(tasks, false, false, numProcs, context.Background()) + } else { + go func() { + _, _ = blockstm.ExecuteParallel(tasks, false, false, numProcs, context.Background()) + }() + } } } - if len(normalPlainTxs) > 0 || len(normalBlobTxs) > 0 { - heapInitTime := time.Now() - plainTxs := newTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee, &w.interruptBlockBuilding) - blobTxs := newTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee, &w.interruptBlockBuilding) - txHeapInitTimer.Update(time.Since(heapInitTime)) - if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt, new(uint256.Int)); err != nil { + // commit transactions + if plainTxsPrio != nil || blobTxsPrio != nil { + if err := w.commitTransactions(env, plainTxsPrio, blobTxsPrio, interrupt, new(uint256.Int)); err != nil { + return err + } + } + if plainTxsNormal != nil || blobTxsNormal != nil { + if err := w.commitTransactions(env, plainTxsNormal, blobTxsNormal, interrupt, new(uint256.Int)); err != nil { return err } } diff --git a/triedb/pathdb/reader.go b/triedb/pathdb/reader.go index b7b18f13f9..26f43156d7 100644 --- a/triedb/pathdb/reader.go +++ b/triedb/pathdb/reader.go @@ -57,7 +57,7 @@ type reader struct { db *Database state common.Hash noHashCheck bool - layer layer + layer layer // entry layer (may be a diff layer) } // Node implements database.NodeReader interface, retrieving the node with specified @@ -174,8 +174,8 @@ func (db *Database) NodeReader(root common.Hash) (database.NodeReader, error) { if layer == nil { return nil, fmt.Errorf("state %#x is not available", root) } - return &reader{ - db: db, + + return &reader{db: db, state: root, noHashCheck: db.isVerkle, layer: layer,