-
Notifications
You must be signed in to change notification settings - Fork 561
Add prefetch trie nodes #1871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: src-optimizations
Are you sure you want to change the base?
Add prefetch trie nodes #1871
Changes from 24 commits
07daffb
ccab3fe
25c5569
b4b035c
04544cf
0d0128b
c7ac291
7f58d8f
1c88a2a
f0d1c50
653424c
8ef31e3
6b1c3f1
46529c1
fa10617
59ea31f
e37cd47
0031a7e
86198b6
31d812c
624867a
aef9546
e263992
ca67765
854e5e2
ab0993b
4bdc1f4
10fb3df
821e07e
bda9e9d
8ba0702
5069130
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| # ./log_analyzer.sh "3 days ago" # Scans the last 3 days | ||
| # | ||
| # ============================================================================== | ||
|
|
||
| # Set the time range for the log scan. Use the first command-line argument | ||
| # if it's provided, otherwise default to "12 hours ago". | ||
| TIME_RANGE=${1:-"12 hours ago"} | ||
|
|
||
| echo "🔍 Scanning 'bor' service logs from the last '$TIME_RANGE'..." | ||
| echo "📊 Generating histogram for single-block import times ('blocks=1')..." | ||
| echo "📊 Extracting segment averages (execavg vs statecalcavg vs validation) if present..." | ||
| echo "" | ||
|
|
||
| # --- Main Logic --- | ||
| # The main pipeline is wrapped in a command group `{ ... }` to fix the sorting. | ||
| # 1. The `echo` commands run first, printing a static header for the output. | ||
| # 2. `journalctl`: Fetches the logs for the 'bor' unit. | ||
| # 3. `awk`: Processes the log lines. | ||
| # - It collects data into the `histogram` array just as before. | ||
| # - The `END` block is now simplified: it *only* prints the data rows. | ||
| # The header printing has been moved outside the awk script. | ||
| # 4. `sort -n`: This now receives only the data rows, so it sorts them | ||
| # numerically without misplacing the header. | ||
|
|
||
| { | ||
| # Print the header first and separately. | ||
| echo "Time Range Count" | ||
| echo "--------------------" | ||
|
|
||
| # Process the logs and print only the data. | ||
| journalctl -u bor --since "$TIME_RANGE" --no-pager | \ | ||
| awk ' | ||
| /Imported/ && /blocks=1 / { | ||
| # This regex now captures both the number and the unit (s or ms) | ||
| if (match($0, /elapsed=([0-9.]+)(ms|s)/, m)) { | ||
| elapsed_val = m[1] # The numerical value | ||
| unit = m[2] # The unit ("ms" or "s") | ||
| # If the unit is milliseconds, convert it to seconds | ||
| if (unit == "ms") { | ||
| elapsed_val = elapsed_val / 1000 | ||
| } | ||
| # Determine the correct bucket and increment the count | ||
| bucket = int(elapsed_val) | ||
| histogram[bucket]++ | ||
| } | ||
| } | ||
| END { | ||
| # This block now ONLY prints the histogram data, not the header. | ||
| for (bucket in histogram) { | ||
| printf "%-12s %d\n", sprintf("%d-%ds:", bucket, bucket + 1), histogram[bucket] | ||
| } | ||
| } | ||
| ' | \ | ||
| sort -n | ||
| } | ||
|
|
||
| # Extract per-block timing breakdowns from BlockTiming logs | ||
| echo "" | ||
| echo "EndBlock Blocks Exec(ms) StateCalc(ms) Validation(ms) Elapsed(s)" | ||
| echo "------------------------------------------------------------------" | ||
| journalctl -u bor --since "$TIME_RANGE" --no-pager | \ | ||
| grep "Imported" | \ | ||
| awk ' | ||
| { | ||
| endblk=""; blocks=""; execdur=""; statedur=""; elapsed="" | ||
| for (i=1; i<=NF; i++) { | ||
| if ($i ~ /number=/) { split($i,a,"="); endblk=a[2] } | ||
| else if ($i ~ /blocks=/) { split($i,a,"="); blocks=a[2] } | ||
| else if ($i ~ /exec=/) { sub(/exec=/, "", $i); execdur=$i } | ||
| else if ($i ~ /statecalc=/) { sub(/statecalc=/, "", $i); statedur=$i } | ||
| else if ($i ~ /elapsed=/) { sub(/elapsed=/, "", $i); elapsed=$i } | ||
| } | ||
| fnorm_ms = function(x) { | ||
| # Normalize PrettyDuration to ms (supports ms, s, and us/µs) | ||
| if (x ~ /ms$/) { sub(/ms$/, "", x); return x+0 } | ||
| if (x ~ /µs$/) { sub(/µs$/, "", x); return (x+0)/1000 } | ||
| if (x ~ /us$/) { sub(/us$/, "", x); return (x+0)/1000 } | ||
| if (x ~ /s$/) { sub(/s$/, "", x); return (x+0)*1000 } | ||
| return x+0 | ||
| } | ||
| fnorm_s = function(x) { | ||
| if (x ~ /ms$/) { sub(/ms$/, "", x); return (x+0)/1000 } | ||
| if (x ~ /µs$/) { sub(/µs$/, "", x); return (x+0)/1e6 } | ||
| if (x ~ /us$/) { sub(/us$/, "", x); return (x+0)/1e6 } | ||
| if (x ~ /s$/) { sub(/s$/, "", x); return (x+0) } | ||
| return x+0 | ||
| } | ||
| if (endblk!="" && blocks!="") { | ||
| e = (execdur!="") ? fnorm_ms(execdur) : 0 | ||
| s = (statedur!="") ? fnorm_ms(statedur) : 0 | ||
| el = (elapsed!="") ? fnorm_s(elapsed) : 0 | ||
| v = (el>0) ? (el*1000 - e - s) : 0 | ||
| printf "%8s %7s %9.2f %13.2f %14.2f %11.2f\n", endblk, blocks, e, s, v, el | ||
| } | ||
| } | ||
| ' | ||
|
|
||
| echo "" | ||
| echo "✅ Analysis complete." | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,10 +86,10 @@ | |
| storageCacheHitMeter = metrics.NewRegisteredMeter("chain/storage/reads/cache/process/hit", nil) | ||
| storageCacheMissMeter = metrics.NewRegisteredMeter("chain/storage/reads/cache/process/miss", nil) | ||
|
|
||
| accountCacheHitPrefetchMeter = metrics.NewRegisteredMeter("chain/account/reads/cache/prefetch/hit", nil) | ||
|
||
| accountCacheMissPrefetchMeter = metrics.NewRegisteredMeter("chain/account/reads/cache/prefetch/miss", nil) | ||
| storageCacheHitPrefetchMeter = metrics.NewRegisteredMeter("chain/storage/reads/cache/prefetch/hit", nil) | ||
| storageCacheMissPrefetchMeter = metrics.NewRegisteredMeter("chain/storage/reads/cache/prefetch/miss", nil) | ||
|
|
||
| accountReadSingleTimer = metrics.NewRegisteredResettingTimer("chain/account/single/reads", nil) //nolint:revive,unused | ||
| storageReadSingleTimer = metrics.NewRegisteredResettingTimer("chain/storage/single/reads", nil) //nolint:revive,unused | ||
|
|
@@ -122,11 +122,14 @@ | |
| blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) | ||
| blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) | ||
|
|
||
| blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil) | ||
| blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) | ||
| blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil) | ||
| blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil) | ||
|
|
||
| // Warm-up timing (seedReaderCache) | ||
| triewarmTimer = metrics.NewRegisteredTimer("chain/warm", nil) | ||
|
|
||
| errInsertionInterrupted = errors.New("insertion is interrupted") | ||
| errChainStopped = errors.New("blockchain is stopped") | ||
| errInvalidOldChain = errors.New("invalid old chain") | ||
|
|
@@ -204,9 +207,10 @@ | |
| ChainHistoryMode history.HistoryMode | ||
|
|
||
| // Misc options | ||
| NoPrefetch bool // Whether to disable heuristic state prefetching when processing blocks | ||
| Overrides *ChainOverrides // Optional chain config overrides | ||
| VmConfig vm.Config // Config options for the EVM Interpreter | ||
| NoPrefetch bool // Whether to disable heuristic state prefetching when processing blocks | ||
| WaitForWarm bool // If true, block execution waits for the warm-up to finish | ||
| Overrides *ChainOverrides // Optional chain config overrides | ||
| VmConfig vm.Config // Config options for the EVM Interpreter | ||
|
|
||
| // TxLookupLimit specifies the maximum number of blocks from head for which | ||
| // transaction hashes will be indexed. | ||
|
|
@@ -391,6 +395,13 @@ | |
| chain2HeadFeed event.Feed // Reorg/NewHead/Fork data feed | ||
| chainSideFeed event.Feed // Side chain data feed (removed from geth but needed in bor) | ||
| checker ethereum.ChainValidator | ||
|
|
||
| // Last reader cache stats from ProcessBlock (read by insertChain for logging) | ||
| lastProcAccHit int64 | ||
| lastProcAccMiss int64 | ||
| lastProcStorHit int64 | ||
| lastProcStorMiss int64 | ||
| lastWarmSeedDur time.Duration | ||
| } | ||
|
|
||
| // NewBlockChain returns a fully initialised block chain using information | ||
|
|
@@ -663,13 +674,13 @@ | |
| return bc, nil | ||
| } | ||
|
|
||
| func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) { | ||
| func (bc *BlockChain) ProcessBlock(block *types.Block, parent *types.Header, witness *stateless.Witness, followupInterrupt *atomic.Bool, warmCh <-chan warmOutcome) (_ types.Receipts, _ []*types.Log, _ uint64, _ *state.StateDB, vtime time.Duration, blockEndErr error) { | ||
| // Process the block using processor and parallelProcessor at the same time, take the one which finishes first, cancel the other, and return the result | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| defer cancel() | ||
|
|
||
| if followupInterrupt == nil { | ||
| followupInterrupt = &atomic.Bool{} | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is linter right here?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure. I didn't make this change. |
||
| } | ||
|
|
||
| if bc.logger != nil && bc.logger.OnBlockStart != nil { | ||
|
|
@@ -689,14 +700,32 @@ | |
| } | ||
|
|
||
| parentRoot := parent.Root | ||
| prefetch, process, err := bc.statedb.ReadersWithCacheStats(parentRoot) | ||
| _, process, err := bc.statedb.ReadersWithCacheStats(parentRoot) | ||
| if err != nil { | ||
| return nil, nil, 0, nil, 0, err | ||
| } | ||
| throwaway, err := state.NewWithReader(parentRoot, bc.statedb, prefetch) | ||
| if err != nil { | ||
| return nil, nil, 0, nil, 0, err | ||
| // If an async warm was started earlier for this root, wait for it to finish. | ||
| var warmedProc state.ReaderWithStats | ||
| if warmCh != nil { | ||
| if res, ok := <-warmCh; ok { | ||
| warmedProc = res.Reader | ||
| bc.lastWarmSeedDur = res.Dur | ||
| if res.Dur > 0 { | ||
| triewarmTimer.Update(res.Dur) | ||
| } | ||
| } | ||
| } | ||
| // Prefer an asynchronously warmed reader if available. | ||
| if warmedProc != nil { | ||
| process = warmedProc | ||
| } else { | ||
| bc.lastWarmSeedDur = 0 | ||
| } | ||
| // Create a prefetch statedb for background prefetch | ||
| // throwaway, err := state.NewWithReader(parentRoot, bc.statedb, prefetch) | ||
| // if err != nil { | ||
| // return nil, nil, 0, nil, 0, err | ||
| // } | ||
| statedb, err := state.NewWithReader(parentRoot, bc.statedb, process) | ||
| if err != nil { | ||
| return nil, nil, 0, nil, 0, err | ||
|
|
@@ -708,29 +737,28 @@ | |
|
|
||
| // Upload the statistics of reader at the end | ||
| defer func() { | ||
| stats := prefetch.GetStats() | ||
| accountCacheHitPrefetchMeter.Mark(stats.AccountHit) | ||
| accountCacheMissPrefetchMeter.Mark(stats.AccountMiss) | ||
| storageCacheHitPrefetchMeter.Mark(stats.StorageHit) | ||
| storageCacheMissPrefetchMeter.Mark(stats.StorageMiss) | ||
| stats = process.GetStats() | ||
| stats := process.GetStats() | ||
| accountCacheHitMeter.Mark(stats.AccountHit) | ||
| accountCacheMissMeter.Mark(stats.AccountMiss) | ||
| storageCacheHitMeter.Mark(stats.StorageHit) | ||
| storageCacheMissMeter.Mark(stats.StorageMiss) | ||
| bc.lastProcAccHit += stats.AccountHit | ||
| bc.lastProcAccMiss += stats.AccountMiss | ||
| bc.lastProcStorHit += stats.StorageHit | ||
| bc.lastProcStorMiss += stats.StorageMiss | ||
| }() | ||
|
|
||
| go func(start time.Time, throwaway *state.StateDB, block *types.Block) { | ||
| // Disable tracing for prefetcher executions. | ||
| vmCfg := bc.cfg.VmConfig | ||
| vmCfg.Tracer = nil | ||
| bc.prefetcher.Prefetch(block, throwaway, vmCfg, followupInterrupt) | ||
| // go func(start time.Time, throwaway *state.StateDB, block *types.Block) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this worth deleting?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I commented it out so that we don't run multiple prefetches together. I'll probably remove this once we finalize all the optimizations. |
||
| // // Disable tracing for prefetcher executions. | ||
| // vmCfg := bc.cfg.VmConfig | ||
| // vmCfg.Tracer = nil | ||
| // bc.prefetcher.Prefetch(block, throwaway, vmCfg, followupInterrupt) | ||
|
|
||
| blockPrefetchExecuteTimer.Update(time.Since(start)) | ||
| if followupInterrupt.Load() { | ||
| blockPrefetchInterruptMeter.Mark(1) | ||
| } | ||
| }(time.Now(), throwaway, block) | ||
| // blockPrefetchExecuteTimer.Update(time.Since(start)) | ||
| // if followupInterrupt.Load() { | ||
| // blockPrefetchInterruptMeter.Mark(1) | ||
| // } | ||
| // }(time.Now(), throwaway, block) | ||
|
|
||
| type Result struct { | ||
| receipts types.Receipts | ||
|
|
@@ -822,6 +850,23 @@ | |
| return result.receipts, result.logs, result.usedGas, result.statedb, vtime, result.err | ||
| } | ||
|
|
||
| // warmReaderCache preloads the supplied readerWithCache with accounts/storage and trie nodes. | ||
| func (bc *BlockChain) warmReaderCache(parentRoot common.Hash, reader state.ReaderWithStats, b *types.Block, followupInterrupt *atomic.Bool) (state.ReaderWithStats, time.Duration) { | ||
| if reader == nil || b == nil || len(b.Transactions()) == 0 { | ||
| return nil, 0 | ||
| } | ||
| vmCfg := bc.cfg.VmConfig | ||
| vmCfg.Tracer = nil | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this required? Same for |
||
| warmdb, err := state.NewWithReader(parentRoot, bc.statedb, reader) | ||
| if err != nil { | ||
| return nil, 0 | ||
| } | ||
|
|
||
| start := time.Now() | ||
| bc.prefetcher.Prefetch(b, warmdb, vmCfg, followupInterrupt) | ||
| return reader, time.Since(start) | ||
| } | ||
|
|
||
| func (bc *BlockChain) setupSnapshot() { | ||
| // Short circuit if the chain is established with path scheme, as the | ||
| // state snapshot has been integrated into path database natively. | ||
|
|
@@ -2639,7 +2684,7 @@ | |
| snapDiffItems, snapBufItems = bc.snaps.Size() | ||
| } | ||
| trieDiffNodes, trieBufNodes, _ := bc.triedb.Size() | ||
| stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true) | ||
| stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true, bc) | ||
| continue | ||
| } | ||
|
|
||
|
|
@@ -2716,7 +2761,7 @@ | |
| snapDiffItems, snapBufItems = bc.snaps.Size() | ||
| } | ||
| trieDiffNodes, trieBufNodes, _ := bc.triedb.Size() | ||
| stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true) | ||
| stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true, bc) | ||
| } | ||
|
|
||
| return int(processed.Load()), nil | ||
|
|
@@ -2820,7 +2865,7 @@ | |
| snapDiffItems, snapBufItems = bc.snaps.Size() | ||
| } | ||
| trieDiffNodes, trieBufNodes, _ := bc.triedb.Size() | ||
| stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true) | ||
| stats.report(chain, i, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, true, true, bc) | ||
| } | ||
| // End-of-batch witness validation | ||
| for i, block := range chain { | ||
|
|
@@ -3111,6 +3156,10 @@ | |
| if parent == nil { | ||
| parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) | ||
| } | ||
|
|
||
| // Start reader cache warming | ||
| warmCh := bc.StartWarmReaderCache(parent.Root, block) | ||
|
|
||
| statedb, err := state.New(parent.Root, bc.statedb) | ||
| if err != nil { | ||
| return nil, it.index, err | ||
|
|
@@ -3170,7 +3219,11 @@ | |
| } | ||
| } | ||
|
|
||
| receipts, logs, usedGas, statedb, vtime, err := bc.ProcessBlock(block, parent, witness, &followupInterrupt) | ||
| var execWarmCh <-chan warmOutcome | ||
| if bc.cfg.WaitForWarm { | ||
| execWarmCh = warmCh | ||
| } | ||
| receipts, logs, usedGas, statedb, vtime, err := bc.ProcessBlock(block, parent, witness, &followupInterrupt, execWarmCh) | ||
| bc.statedb.TrieDB().SetReadBackend(nil) | ||
| bc.statedb.EnableSnapInReader() | ||
| activeState = statedb | ||
|
|
@@ -3208,6 +3261,15 @@ | |
| blockExecutionTimer.Update(ptime - trieRead) // The time spent on EVM processing | ||
| blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation | ||
| borConsensusTime.Update(statedb.BorConsensusTime) // The time spent on bor consensus (span + state sync) | ||
|
|
||
| execOnly := (ptime - trieRead) | ||
| stateCalc := (triehash + trieUpdate + trieRead) | ||
| stats.execDur += execOnly | ||
| stats.stateCalcDur += stateCalc | ||
| if bc.lastWarmSeedDur > 0 { | ||
| stats.warmDur += bc.lastWarmSeedDur | ||
| } | ||
| stats.valDur += (vtime - (triehash + trieUpdate)) | ||
| // Write the block to the chain and get the status. | ||
| var ( | ||
| wstart = time.Now() | ||
|
|
@@ -3260,7 +3322,7 @@ | |
| snapDiffItems, snapBufItems = bc.snaps.Size() | ||
| } | ||
| trieDiffNodes, trieBufNodes, _ := bc.triedb.Size() | ||
| stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead, false) | ||
| stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead, false, bc) | ||
|
|
||
| /* | ||
| // Print confirmation that a future fork is scheduled, but not yet active. | ||
|
|
@@ -4297,3 +4359,61 @@ | |
| } | ||
| } | ||
| } | ||
|
|
||
| type warmOutcome struct { | ||
| Reader state.ReaderWithStats | ||
| Dur time.Duration | ||
| } | ||
|
|
||
| // StartWarmReaderCache launches a goroutine to warm up a state reader's cache | ||
| func (bc *BlockChain) StartWarmReaderCache(parentRoot common.Hash, b *types.Block) <-chan warmOutcome { | ||
| ch := make(chan warmOutcome, 1) | ||
| if b == nil || len(b.Transactions()) == 0 || bc.prefetcher == nil { | ||
| close(ch) | ||
| return ch | ||
| } | ||
| go func() { | ||
| // Build process reader and warm it | ||
| _, process, err := bc.statedb.ReadersWithCacheStats(parentRoot) | ||
| if err == nil && process != nil { | ||
| if warmed, dur := bc.warmReaderCache(parentRoot, process, b, nil); warmed != nil { | ||
| ch <- warmOutcome{Reader: warmed, Dur: dur} | ||
| close(ch) | ||
| return | ||
| } | ||
| } | ||
| close(ch) | ||
| }() | ||
| return ch | ||
| } | ||
|
|
||
| // WaitForWarmEnabled reports whether execution should wait for warm-up to finish. | ||
| func (bc *BlockChain) WaitForWarmEnabled() bool { | ||
| if bc.cfg == nil { | ||
| return false | ||
| } | ||
| return bc.cfg.WaitForWarm | ||
| } | ||
|
|
||
| // NewStateWithReader creates a new StateDB bound to the given reader for the root. | ||
| func (bc *BlockChain) NewStateWithReader(root common.Hash, rdr state.ReaderWithStats) (*state.StateDB, error) { | ||
| return state.NewWithReader(root, bc.statedb, rdr) | ||
| } | ||
|
|
||
| // PrefetchFromTxpool launches background prefetching for pending txs against a header root. | ||
| func (bc *BlockChain) PrefetchFromTxpool(header *types.Header, txs []*types.Transaction) { | ||
| if header == nil || len(txs) == 0 || bc.prefetcher == nil { | ||
| return | ||
| } | ||
| go func() { | ||
| warmdb, err := state.New(header.Root, bc.statedb) | ||
| if err != nil { | ||
| return | ||
| } | ||
| vmCfg := bc.cfg.VmConfig | ||
| vmCfg.Tracer = nil | ||
| synthetic := types.NewBlockWithHeader(header).WithBody(types.Body{Transactions: txs}) | ||
| var noop atomic.Bool | ||
| bc.prefetcher.Prefetch(synthetic, warmdb, vmCfg, &noop) | ||
|
||
| }() | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be excluded from this PR I guess?