diff --git a/db/state/aggregator.go b/db/state/aggregator.go index 5b1f6e27749..d26a3fbf4a8 100644 --- a/db/state/aggregator.go +++ b/db/state/aggregator.go @@ -48,6 +48,7 @@ import ( "github.com/erigontech/erigon/db/kv" "github.com/erigontech/erigon/db/kv/bitmapdb" "github.com/erigontech/erigon/db/kv/order" + "github.com/erigontech/erigon/db/kv/rawdbv3" "github.com/erigontech/erigon/db/kv/stream" "github.com/erigontech/erigon/db/state/statecfg" "github.com/erigontech/erigon/db/version" @@ -62,6 +63,7 @@ type Aggregator struct { dirs datadir.Dirs stepSize uint64 + reorgBlockDepth uint64 dirtyFilesLock sync.Mutex visibleFilesLock sync.RWMutex visibleFilesMinimaxTxNum atomic.Uint64 @@ -95,7 +97,7 @@ type Aggregator struct { checker *DependencyIntegrityChecker } -func newAggregator(ctx context.Context, dirs datadir.Dirs, stepSize uint64, db kv.RoDB, logger log.Logger) (*Aggregator, error) { +func newAggregator(ctx context.Context, dirs datadir.Dirs, stepSize, reorgBlockDepth uint64, db kv.RoDB, logger log.Logger) (*Aggregator, error) { ctx, ctxCancel := context.WithCancel(ctx) return &Aggregator{ ctx: ctx, @@ -104,6 +106,7 @@ func newAggregator(ctx context.Context, dirs datadir.Dirs, stepSize uint64, db k onFilesDelete: func(frozenFileNames []string) {}, dirs: dirs, stepSize: stepSize, + reorgBlockDepth: reorgBlockDepth, db: db, leakDetector: dbg.NewLeakDetector("agg", dbg.SlowTx()), ps: background.NewProgressSet(), @@ -619,7 +622,17 @@ func (sf AggV3StaticFiles) CleanupOnError() { } } +var errStepNotReady = errors.New("step not ready") + func (a *Aggregator) buildFiles(ctx context.Context, step kv.Step) error { + lastBlockInStep, lastBlockInDB, lastTxInDB, ok, err := a.readyForCollation(ctx, step) + if err != nil { + return err + } + if !ok { + a.logger.Debug("[agg] step not ready for collation", "step", step, "lastTxInStep", lastTxNumOfStep(step, a.StepSize()), "lastBlockInStep", lastBlockInStep, "lastTxInDB", lastTxInDB, "lastBlockInDB", lastBlockInDB) + return errStepNotReady + } a.logger.Debug("[agg] collate and build", "step", step, "collate_workers", a.collateAndBuildWorkers, "merge_workers", a.mergeWorkers, "compress_workers", a.d[kv.AccountsDomain].CompressCfg.Workers) var ( @@ -738,6 +751,25 @@ func (a *Aggregator) buildFiles(ctx context.Context, step kv.Step) error { return nil } +func (a *Aggregator) readyForCollation(ctx context.Context, step kv.Step) (lastBlockInStep, lastBlockInDB, lastTxInDB uint64, ok bool, err error) { + if a.reorgBlockDepth == 0 { + return 0, 0, 0, true, nil + } + err = a.db.View(ctx, func(tx kv.Tx) error { + lastBlockInStep, ok, err = rawdbv3.TxNums.FindBlockNum(tx, lastTxNumOfStep(step, a.stepSize)) + if err != nil { + return err + } + if !ok { + lastBlockInStep = 0 + } + lastTxInDB, lastBlockInDB, err = rawdbv3.TxNums.Last(tx) + return err + }) + ok = err == nil && lastBlockInDB > lastBlockInStep+a.reorgBlockDepth + return +} + func (a *Aggregator) BuildFiles(toTxNum uint64) (err error) { finished := a.BuildFilesInBackground(toTxNum) if !(a.buildingFiles.Load() || a.mergingFiles.Load()) { @@ -780,6 +812,9 @@ func (a *Aggregator) BuildFiles2(ctx context.Context, fromStep, toStep kv.Step) } for step := fromStep; step < toStep; step++ { //`step` must be fully-written - means `step+1` records must be visible if err := a.buildFiles(ctx, step); err != nil { + if errors.Is(err, errStepNotReady) { + break + } if errors.Is(err, context.Canceled) || errors.Is(err, common.ErrStopped) { panic(err) } @@ -915,12 +950,12 @@ type flusher interface { Flush(ctx context.Context, tx kv.RwTx) error } -func (at *AggregatorRoTx) StepsInFiles(entitySet ...kv.Domain) uint64 { +func (at *AggregatorRoTx) StepsInFiles(entitySet ...kv.Domain) kv.Step { txNumInFiles := at.TxNumsInFiles(entitySet...) if txNumInFiles > 0 { txNumInFiles-- } - return txNumInFiles / at.StepSize() + return kv.Step(txNumInFiles / at.StepSize()) } func (at *AggregatorRoTx) TxNumsInFiles(entitySet ...kv.Domain) (minTxNum uint64) { @@ -954,6 +989,47 @@ func (at *AggregatorRoTx) CanPrune(tx kv.Tx, untilTx uint64) bool { return false } +// IIBacklogInfo returns the maximum backlog across inverted indexes and diagnostic info. +// Backlog = txNums in MDBX that should be pruned (below files.EndTxNum). +// Also returns list of IIs with visibility issues (EndTxNum = 0). +func (at *AggregatorRoTx) IIBacklogInfo(tx kv.Tx) (maxBacklog uint64, blockedIIs []string) { + for _, ii := range at.iis { + endTxNum := ii.files.EndTxNum() + if endTxNum == 0 { + // No visible files - can't prune this II + blockedIIs = append(blockedIIs, ii.ii.FilenameBase) + continue + } + minTxNum := ii.ii.minTxNumInDB(tx) + if minTxNum < endTxNum { + backlog := endTxNum - minTxNum + if backlog > maxBacklog { + maxBacklog = backlog + } + } + } + return maxBacklog, blockedIIs +} + +// CommitmentBacklogInfo returns the backlog for CommitmentDomain history pruning. +// Backlog = txNums in MDBX that could be pruned (history entries below files.EndTxNum). +// Returns 0 if no pruning is possible or history is disabled. +func (at *AggregatorRoTx) CommitmentBacklogInfo(tx kv.Tx) uint64 { + cd := at.d[kv.CommitmentDomain] + if cd.ht.h.HistoryDisabled { + return 0 + } + canHist, txTo := cd.ht.canPruneUntil(tx, math.MaxUint64) + if !canHist || txTo == 0 { + return 0 + } + minTxNum := cd.ht.iit.ii.minTxNumInDB(tx) + if minTxNum >= txTo { + return 0 + } + return txTo - minTxNum +} + // PruneSmallBatches is not cancellable, it's over when it's over or failed. // It fills whole timeout with pruning by small batches (of 100 keys) and making some progress func (at *AggregatorRoTx) PruneSmallBatches(ctx context.Context, timeout time.Duration, tx kv.RwTx) (haveMore bool, err error) { @@ -1614,6 +1690,9 @@ func (a *Aggregator) BuildFilesInBackground(txNum uint64) chan struct{} { // - during files build, may happen commit of new data. on each loop step getting latest id in db for ; step < lastInDB; step++ { //`step` must be fully-written - means `step+1` records must be visible if err := a.buildFiles(a.ctx, step); err != nil { + if errors.Is(err, errStepNotReady) { + break + } if errors.Is(err, context.Canceled) || errors.Is(err, common.ErrStopped) { close(fin) return diff --git a/execution/stagedsync/exec3.go b/execution/stagedsync/exec3.go index 5e4f152f25f..9f830ff1112 100644 --- a/execution/stagedsync/exec3.go +++ b/execution/stagedsync/exec3.go @@ -66,8 +66,11 @@ var ( mxMgas = metrics.NewGauge(`exec_mgas`) ) +// Track prune modes for logging only on transitions +var lastIIPruneMode string +var lastCommitmentPruneMode string + const ( - changesetSafeRange = 32 // Safety net for long-sync, keep last 32 changesets maxUnwindJumpAllowance = 1000 // Maximum number of blocks we are allowed to unwind ) @@ -294,17 +297,14 @@ func ExecV3(ctx context.Context, return nil } - shouldGenerateChangesets := maxBlockNum-blockNum <= changesetSafeRange || cfg.syncCfg.AlwaysGenerateChangesets - if blockNum < cfg.blockReader.FrozenBlocks() { - shouldGenerateChangesets = false - } - if maxBlockNum > blockNum+16 { log.Info(fmt.Sprintf("[%s] starting", execStage.LogPrefix()), "from", blockNum, "to", maxBlockNum, "fromTxNum", doms.TxNum(), "offsetFromBlockBeginning", offsetFromBlockBeginning, "initialCycle", initialCycle, "useExternalTx", useExternalTx, "inMem", inMemExec) } - agg.BuildFilesInBackground(outputTxNum.Load()) + if execStage.SyncMode() == stages.ModeApplyingBlocks { + agg.BuildFilesInBackground(outputTxNum.Load()) + } var count uint64 @@ -372,24 +372,26 @@ func ExecV3(ctx context.Context, accumulator: accumulator, isMining: isMining, inMemExec: inMemExec, + initialCycle: initialCycle, applyTx: applyTx, applyWorker: applyWorker, + inputBlockNum: inputBlockNum, + maxBlockNum: maxBlockNum, outputTxNum: &outputTxNum, outputBlockNum: stages.SyncMetrics[stages.Execution], logger: logger, }, - shouldGenerateChangesets: shouldGenerateChangesets, - workerCount: workerCount, - pruneEvery: pruneEvery, - logEvery: logEvery, - progress: progress, + workerCount: workerCount, + pruneEvery: pruneEvery, + logEvery: logEvery, + progress: progress, } executorCancel := pe.run(ctx, maxTxNum, logger) defer executorCancel() defer func() { - progress.Log("Done", executor.readState(), nil, pe.rws, 0 /*txCount - TODO*/, logGas, inputBlockNum.Load(), outputBlockNum.GetValueUint64(), outputTxNum.Load(), mxExecRepeats.GetValueUint64(), stepsInDB, shouldGenerateChangesets, inMemExec) + progress.Log("Done", executor.readState(), nil, pe.rws, 0 /*txCount - TODO*/, logGas, inputBlockNum.Load(), outputBlockNum.GetValueUint64(), outputTxNum.Load(), mxExecRepeats.GetValueUint64(), stepsInDB, pe.shouldGenerateChangeSets(), inMemExec) }() executor = pe @@ -406,8 +408,11 @@ func ExecV3(ctx context.Context, u: u, isMining: isMining, inMemExec: inMemExec, + initialCycle: initialCycle, applyTx: applyTx, applyWorker: applyWorker, + inputBlockNum: inputBlockNum, + maxBlockNum: maxBlockNum, outputTxNum: &outputTxNum, outputBlockNum: stages.SyncMetrics[stages.Execution], logger: logger, @@ -415,7 +420,7 @@ func ExecV3(ctx context.Context, } defer func() { - progress.Log("Done", executor.readState(), nil, nil, se.txCount, logGas, inputBlockNum.Load(), outputBlockNum.GetValueUint64(), outputTxNum.Load(), mxExecRepeats.GetValueUint64(), stepsInDB, shouldGenerateChangesets || cfg.syncCfg.KeepExecutionProofs, inMemExec) + progress.Log("Done", executor.readState(), nil, nil, se.txCount, logGas, inputBlockNum.Load(), outputBlockNum.GetValueUint64(), outputTxNum.Load(), mxExecRepeats.GetValueUint64(), stepsInDB, se.shouldGenerateChangeSets() || cfg.syncCfg.KeepExecutionProofs, inMemExec) }() executor = se @@ -436,7 +441,9 @@ func ExecV3(ctx context.Context, "from", blockNum, "to", maxBlockNum, "fromTxNum", executor.domains().TxNum(), "offsetFromBlockBeginning", offsetFromBlockBeginning, "initialCycle", initialCycle, "useExternalTx", useExternalTx) } - agg.BuildFilesInBackground(outputTxNum.Load()) + if execStage.SyncMode() == stages.ModeApplyingBlocks { + agg.BuildFilesInBackground(outputTxNum.Load()) + } var readAhead chan uint64 if !isMining && !inMemExec && execStage.CurrentSyncCycle.IsInitialCycle { @@ -450,27 +457,23 @@ func ExecV3(ctx context.Context, } var b *types.Block - - // Only needed by bor chains - shouldGenerateChangesetsForLastBlocks := cfg.chainConfig.Bor != nil startBlockNum := blockNum blockLimit := uint64(cfg.syncCfg.LoopBlockLimit) var errExhausted *ErrLoopExhausted + var lastFrozenTxNum uint64 + var lastFrozenStep kv.Step + + if temporalTx, ok := applyTx.(kv.TemporalTx); ok { + lastFrozenStep = temporalTx.StepsInFiles(kv.CommitmentDomain) + } + if lastFrozenStep > 0 { + lastFrozenTxNum = uint64((lastFrozenStep+1)*kv.Step(doms.StepSize())) - 1 + } + Loop: for ; blockNum <= maxBlockNum; blockNum++ { - // set shouldGenerateChangesets=true if we are at last n blocks from maxBlockNum. this is as a safety net in chains - // where during initial sync we can expect bogus blocks to be imported. - if !shouldGenerateChangesets && shouldGenerateChangesetsForLastBlocks && blockNum > cfg.blockReader.FrozenBlocks() && blockNum+changesetSafeRange >= maxBlockNum { - start := time.Now() - executor.domains().SetChangesetAccumulator(nil) // Make sure we don't have an active changeset accumulator - // First compute and commit the progress done so far - if _, err := executor.domains().ComputeCommitment(ctx, true, blockNum, inputTxNum, execStage.LogPrefix()); err != nil { - return err - } - computeCommitmentDuration += time.Since(start) - shouldGenerateChangesets = true // now we can generate changesets for the safety net - } + shouldGenerateChangesets := shouldGenerateChangeSets(cfg, blockNum, maxBlockNum, initialCycle) changeSet := &changeset2.StateChangeSet{} if shouldGenerateChangesets && blockNum > 0 { executor.domains().SetChangesetAccumulator(changeSet) @@ -570,7 +573,7 @@ Loop: Withdrawals: b.Withdrawals(), // use history reader instead of state reader to catch up to the tx where we left off - HistoryExecution: offsetFromBlockBeginning > 0 && txIndex < int(offsetFromBlockBeginning), + HistoryExecution: lastFrozenTxNum > 0 && inputTxNum <= lastFrozenTxNum, BlockReceipts: blockReceipts, @@ -656,7 +659,9 @@ Loop: return err } - agg.BuildFilesInBackground(outputTxNum.Load()) + if execStage.SyncMode() == stages.ModeApplyingBlocks { + agg.BuildFilesInBackground(outputTxNum.Load()) + } } else { se := executor.(*serialExecutor) @@ -763,12 +768,70 @@ Loop: // allow greedy prune on non-chain-tip pruneTimeout := 250 * time.Millisecond + iiPruneMode := "normal" if initialCycle { pruneTimeout = 10 * time.Hour + iiPruneMode = "initialCycle" if err = executor.tx().(kv.TemporalRwTx).GreedyPruneHistory(ctx, kv.CommitmentDomain); err != nil { return err } + } else { + // Check for II backlog - enable aggressive prune if needed + iiBacklog, blockedIIs := aggregatorRo.IIBacklogInfo(executor.tx()) + if len(blockedIIs) > 0 { + iiPruneMode = "blocked" + } else if iiBacklog > 10_000_000 { // >10M txNums behind (~500 steps) + pruneTimeout = 30 * time.Minute + iiPruneMode = "aggressive" + } else if iiBacklog > 1_000_000 { // >1M txNums behind (~50 steps) + pruneTimeout = 10 * time.Minute + iiPruneMode = "medium" + } + + // Check for CommitmentDomain history backlog - call GreedyPruneHistory if behind + commitmentBacklog := aggregatorRo.CommitmentBacklogInfo(executor.tx()) + commitmentPruneMode := "normal" + if commitmentBacklog > 10_000_000 { // >10M txNums behind + commitmentPruneMode = "aggressive" + if err = executor.tx().(kv.TemporalRwTx).GreedyPruneHistory(ctx, kv.CommitmentDomain); err != nil { + return err + } + } + + // Log commitment prune mode transitions + if commitmentPruneMode != lastCommitmentPruneMode { + prevMode := lastCommitmentPruneMode + lastCommitmentPruneMode = commitmentPruneMode + switch commitmentPruneMode { + case "aggressive": + logger.Info("[OtterSync] Commitment prune backlog: aggressive", "backlog", commitmentBacklog) + case "normal": + if prevMode == "aggressive" { + logger.Info("[OtterSync] Commitment prune backlog: off") + } + } + } + } + + // Log only on mode transitions + if iiPruneMode != lastIIPruneMode { + prevMode := lastIIPruneMode + lastIIPruneMode = iiPruneMode + switch iiPruneMode { + case "blocked": + iiBacklog, blockedIIs := aggregatorRo.IIBacklogInfo(executor.tx()) + logger.Warn("[OtterSync] II prune blocked - no visible files", + "blockedIIs", blockedIIs, "backlog", iiBacklog) + case "aggressive": + logger.Info("[OtterSync] II prune backlog: aggressive", "timeout", pruneTimeout) + case "medium": + logger.Info("[OtterSync] II prune backlog: medium", "timeout", pruneTimeout) + case "normal": + if prevMode == "aggressive" || prevMode == "medium" || prevMode == "blocked" { + logger.Info("[OtterSync] II prune backlog: off") + } + } } if _, err := aggregatorRo.PruneSmallBatches(ctx, pruneTimeout, executor.tx()); err != nil { @@ -800,16 +863,22 @@ Loop: } } - if blockLimit > 0 && blockNum-startBlockNum+1 >= blockLimit { - errExhausted = &ErrLoopExhausted{From: startBlockNum, To: blockNum, Reason: "block limit reached"} - break - } - select { case <-ctx.Done(): return ctx.Err() default: } + + lastExecutedStep := kv.Step(inputTxNum / agg.StepSize()) + + // if we're in the initialCycle before we consider the blockLimit we need to make sure we keep executing + // until we reach a transaction whose comittement which is writable to the db, otherwise the update will get lost + if !initialCycle || lastExecutedStep > 0 && lastExecutedStep > lastFrozenStep && !dbg.DiscardCommitment() { + if blockLimit > 0 && blockNum-startBlockNum+1 >= blockLimit { + errExhausted = &ErrLoopExhausted{From: startBlockNum, To: blockNum, Reason: "block limit reached"} + break + } + } } //log.Info("Executed", "blocks", inputBlockNum.Load(), "txs", outputTxNum.Load(), "repeats", mxExecRepeats.GetValueUint64()) @@ -836,7 +905,9 @@ Loop: } } - agg.BuildFilesInBackground(outputTxNum.Load()) + if execStage.SyncMode() == stages.ModeApplyingBlocks { + agg.BuildFilesInBackground(outputTxNum.Load()) + } if errExhausted != nil && blockNum < maxBlockNum { // special err allows the loop to continue, caller will call us again to continue from where we left off @@ -1011,3 +1082,17 @@ func blockWithSenders(ctx context.Context, db kv.RoDB, tx kv.Tx, blockReader ser } return b, err } + +func shouldGenerateChangeSets(cfg ExecuteBlockCfg, blockNum, maxBlockNum uint64, initialCycle bool) bool { + if cfg.syncCfg.AlwaysGenerateChangesets { + return true + } + if blockNum < cfg.blockReader.FrozenBlocks() { + return false + } + if initialCycle { + return false + } + // once past the initial cycle, make sure to generate changesets for the last blocks that fall in the reorg window + return blockNum+cfg.syncCfg.MaxReorgDepth >= maxBlockNum +} diff --git a/execution/stagedsync/stage_execute.go b/execution/stagedsync/stage_execute.go index b438a9e7341..5032d37e765 100644 --- a/execution/stagedsync/stage_execute.go +++ b/execution/stagedsync/stage_execute.go @@ -18,6 +18,7 @@ package stagedsync import ( "context" + "encoding/binary" "errors" "fmt" "math" @@ -231,6 +232,9 @@ func unwindExec3(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx contex var mxState3Unwind = metrics.GetOrCreateSummary("state3_unwind") +// Track ChangeSets3 prune mode for logging only on transitions +var lastChangeSetsPruneMode string + func unwindExec3State(ctx context.Context, tx kv.TemporalRwTx, sd *state.SharedDomains, blockUnwindTo, txUnwindTo uint64, accumulator *shards.Accumulator, @@ -443,10 +447,52 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con // Some blocks on bor-mainnet have 400 chunks of diff = 3mb var pruneDiffsLimitOnChainTip = 1_000 pruneTimeout := quickPruneTimeout + changeSetsPruneMode := "normal" + if s.CurrentSyncCycle.IsInitialCycle { pruneDiffsLimitOnChainTip = math.MaxInt - pruneTimeout = time.Hour + pruneTimeout = 5 * time.Minute + changeSetsPruneMode = "initialCycle" + } else { + // Check for ChangeSets3 backlog - enable aggressive prune if behind + pruneTo := s.ForwardProgress - cfg.syncCfg.MaxReorgDepth + if c, err := tx.Cursor(kv.ChangeSets3); err == nil { + k, _, _ := c.First() + c.Close() + if k != nil { + minBlock := binary.BigEndian.Uint64(k) + if minBlock < pruneTo { + backlog := pruneTo - minBlock + if backlog > 1_000_000 { // >1M blocks behind + pruneDiffsLimitOnChainTip = math.MaxInt + pruneTimeout = 5 * time.Minute + changeSetsPruneMode = "aggressive" + } else if backlog > 100_000 { // >100K blocks behind + pruneDiffsLimitOnChainTip = 100_000 + pruneTimeout = 5 * time.Minute + changeSetsPruneMode = "medium" + } + } + } + } } + + // Log ChangeSets3 prune mode transitions + if changeSetsPruneMode != lastChangeSetsPruneMode { + prevMode := lastChangeSetsPruneMode + lastChangeSetsPruneMode = changeSetsPruneMode + switch changeSetsPruneMode { + case "aggressive": + logger.Info("[OtterSync] ChangeSets3 prune backlog: aggressive", "timeout", pruneTimeout) + case "medium": + logger.Info("[OtterSync] ChangeSets3 prune backlog: medium", "timeout", pruneTimeout) + case "normal": + if prevMode == "aggressive" || prevMode == "medium" { + logger.Info("[OtterSync] ChangeSets3 prune backlog: off") + } + } + } + pruneChangeSetsStartTime := time.Now() if err := rawdb.PruneTable( tx, @@ -461,7 +507,7 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con return err } if duration := time.Since(pruneChangeSetsStartTime); duration > quickPruneTimeout { - logger.Debug( + logger.Info( fmt.Sprintf("[%s] prune changesets timing", s.LogPrefix()), "duration", duration, "initialCycle", s.CurrentSyncCycle.IsInitialCycle, @@ -474,7 +520,7 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con pruneTimeout := quickPruneTimeout if s.CurrentSyncCycle.IsInitialCycle { - pruneTimeout = 12 * time.Hour + pruneTimeout = 5 * time.Minute // allow greedy prune on non-chain-tip greedyPruneCommitmentHistoryStartTime := time.Now() @@ -482,7 +528,7 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con return err } if duration := time.Since(greedyPruneCommitmentHistoryStartTime); duration > quickPruneTimeout { - logger.Debug( + logger.Info( fmt.Sprintf("[%s] greedy prune commitment history timing", s.LogPrefix()), "duration", duration, "initialCycle", s.CurrentSyncCycle.IsInitialCycle, @@ -496,7 +542,7 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con return err } if duration := time.Since(pruneSmallBatchesStartTime); duration > quickPruneTimeout { - logger.Debug( + logger.Info( fmt.Sprintf("[%s] prune small batches timing", s.LogPrefix()), "duration", duration, "initialCycle", s.CurrentSyncCycle.IsInitialCycle,