Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/c2h5oh/datasize"
"github.com/erigontech/mdbx-go/mdbx"
"github.com/erigontech/secp256k1"
lru "github.com/hashicorp/golang-lru/arc/v2"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -68,7 +67,6 @@ import (
"github.com/erigontech/erigon/execution/state/genesiswrite"
"github.com/erigontech/erigon/execution/tracing"
"github.com/erigontech/erigon/execution/types"
"github.com/erigontech/erigon/execution/types/accounts"
"github.com/erigontech/erigon/execution/vm"
"github.com/erigontech/erigon/node/debug"
"github.com/erigontech/erigon/node/eth"
Expand All @@ -81,7 +79,6 @@ import (
"github.com/erigontech/erigon/p2p"
"github.com/erigontech/erigon/p2p/sentry"
"github.com/erigontech/erigon/p2p/sentry/sentry_multi_client"
"github.com/erigontech/erigon/polygon/bor"
"github.com/erigontech/erigon/polygon/bor/borcfg"
"github.com/erigontech/erigon/polygon/bridge"
"github.com/erigontech/erigon/polygon/heimdall"
Expand Down Expand Up @@ -665,8 +662,8 @@ func stageBodies(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) err
}

u := sync.NewUnwindState(stages.Bodies, s.BlockNumber-unwind, s.BlockNumber, true, false)
cfg := stagedsync.StageBodiesCfg(db, nil, nil, nil, nil, 0, chainConfig, br, bw)
if err := stagedsync.UnwindBodiesStage(u, tx, cfg, ctx); err != nil {
cfg := stagedsync.StageBodiesCfg(nil, nil, nil, nil, 0, chainConfig, br, bw)
if err := stagedsync.UnwindBodiesStage(u, tx, cfg); err != nil {
return err
}

Expand Down Expand Up @@ -752,7 +749,7 @@ func stageSenders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) er
return err
}

cfg := stagedsync.StageSendersCfg(db, chainConfig, sync.Cfg(), false, tmpdir, pm, br, nil)
cfg := stagedsync.StageSendersCfg(chainConfig, sync.Cfg(), false /* badBlockHalt */, tmpdir, pm, br, nil /* hd */)
if unwind > 0 {
u := sync.NewUnwindState(stages.Senders, s.BlockNumber-unwind, s.BlockNumber, true, false)
if err = stagedsync.UnwindSendersStage(u, tx, cfg, ctx); err != nil {
Expand Down Expand Up @@ -996,7 +993,6 @@ func stageCustomTrace(db kv.TemporalRwDB, ctx context.Context, logger log.Logger
func stageTxLookup(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
_, _, _, sync := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig := fromdb.ChainConfig(db)
must(sync.SetCurrentStage(stages.TxLookup))
if reset {
return db.Update(ctx, rawdbreset.ResetTxLookup)
Expand All @@ -1014,7 +1010,7 @@ func stageTxLookup(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) e
logger.Info("Stage", "name", s.ID, "progress", s.BlockNumber)

br, _ := blocksIO(db, logger)
cfg := stagedsync.StageTxLookupCfg(db, pm, dirs.Tmp, chainConfig.Bor, br)
cfg := stagedsync.StageTxLookupCfg(pm, dirs.Tmp, br)
if unwind > 0 {
u := sync.NewUnwindState(stages.TxLookup, s.BlockNumber-unwind, s.BlockNumber, true, false)
err = stagedsync.UnwindTxLookup(u, s, tx, cfg, ctx, logger)
Expand Down Expand Up @@ -1256,20 +1252,10 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *buildercfg.M
if err != nil {
panic(err)
}

notifications := shards.NewNotifications(nil)

var signatures *lru.ARCCache[common.Hash, accounts.Address]

if bor, ok := engine.(*bor.Bor); ok {
signatures = bor.Signatures
}
blockRetire := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, db, heimdallStore, bridgeStore, chainConfig, &cfg, notifications.Events, blockSnapBuildSema, logger)

stageList := stageloop.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, nil, nil,
signatures, logger, nil)
stageList := stageloop.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, nil, nil, nil)
sync := stagedsync.New(cfg.Sync, stageList, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ModeApplyingBlocks)

return blockRetire, engine, vmConfig, sync
}

Expand Down
15 changes: 8 additions & 7 deletions execution/stagedsync/default_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func DefaultStages(ctx context.Context,
exec ExecuteBlockCfg,
txLookup TxLookupCfg,
finish FinishCfg,
test bool) []*Stage {
test bool,
) []*Stage {
return []*Stage{
{
ID: stages.Snapshots,
Expand Down Expand Up @@ -77,7 +78,7 @@ func DefaultStages(ctx context.Context,
return SpawnBlockHashStage(s, tx, blockHashCfg, ctx, logger)
},
Unwind: func(u *UnwindState, s *StageState, sd *execctx.SharedDomains, tx kv.TemporalRwTx, logger log.Logger) error {
return UnwindBlockHashStage(u, tx, blockHashCfg, ctx)
return UnwindBlockHashStage(u, tx)
},
Prune: func(ctx context.Context, p *PruneState, tx kv.RwTx, timeout time.Duration, logger log.Logger) error {
return nil
Expand All @@ -90,7 +91,7 @@ func DefaultStages(ctx context.Context,
return BodiesForward(s, u, ctx, tx, bodies, test, logger)
},
Unwind: func(u *UnwindState, s *StageState, sd *execctx.SharedDomains, tx kv.TemporalRwTx, logger log.Logger) error {
return UnwindBodiesStage(u, tx, bodies, ctx)
return UnwindBodiesStage(u, tx, bodies)
},
Prune: func(ctx context.Context, p *PruneState, tx kv.RwTx, timeout time.Duration, logger log.Logger) error {
return nil
Expand Down Expand Up @@ -195,7 +196,7 @@ func PipelineStages(ctx context.Context, snapshots SnapshotsCfg, blockHashCfg Bl
return SpawnBlockHashStage(s, tx, blockHashCfg, ctx, logger)
},
Unwind: func(u *UnwindState, s *StageState, sd *execctx.SharedDomains, tx kv.TemporalRwTx, logger log.Logger) error {
return UnwindBlockHashStage(u, tx, blockHashCfg, ctx)
return UnwindBlockHashStage(u, tx)
},
Prune: func(ctx context.Context, p *PruneState, tx kv.RwTx, timeout time.Duration, logger log.Logger) error {
return nil
Expand Down Expand Up @@ -234,7 +235,7 @@ func PipelineStages(ctx context.Context, snapshots SnapshotsCfg, blockHashCfg Bl
ID: stages.WitnessProcessing,
Description: "Process buffered witness data",
Forward: func(badBlockUnwind bool, s *StageState, u Unwinder, sd *execctx.SharedDomains, tx kv.TemporalRwTx, logger log.Logger) error {
return SpawnStageWitnessProcessing(s, tx, *witnessProcessing, ctx, logger)
return SpawnStageWitnessProcessing(tx, *witnessProcessing, logger)
},
Unwind: func(u *UnwindState, s *StageState, sd *execctx.SharedDomains, tx kv.TemporalRwTx, logger log.Logger) error {
return UnwindWitnessProcessingStage(u, s, tx, ctx, *witnessProcessing, logger)
Expand Down Expand Up @@ -297,7 +298,7 @@ func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, bloc
return nil
},
Unwind: func(u *UnwindState, s *StageState, sd *execctx.SharedDomains, tx kv.TemporalRwTx, logger log.Logger) error {
return UnwindBodiesStage(u, tx, bodies, ctx)
return UnwindBodiesStage(u, tx, bodies)
},
},
{
Expand All @@ -307,7 +308,7 @@ func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, bloc
return SpawnBlockHashStage(s, tx, blockHashCfg, ctx, logger)
},
Unwind: func(u *UnwindState, s *StageState, sd *execctx.SharedDomains, tx kv.TemporalRwTx, logger log.Logger) error {
return UnwindBlockHashStage(u, tx, blockHashCfg, ctx)
return UnwindBlockHashStage(u, tx)
},
},
{
Expand Down
17 changes: 4 additions & 13 deletions execution/stagedsync/stage_blockhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,17 @@ import (
"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/db/kv"
"github.com/erigontech/erigon/db/rawdb/blockio"
"github.com/erigontech/erigon/execution/chain"
"github.com/erigontech/erigon/execution/stagedsync/stages"
)

type BlockHashesCfg struct {
db kv.RwDB
tmpDir string
cc *chain.Config

tmpDir string
headerWriter *blockio.BlockWriter
}

func StageBlockHashesCfg(db kv.RwDB, tmpDir string, cc *chain.Config, headerWriter *blockio.BlockWriter) BlockHashesCfg {
func StageBlockHashesCfg(tmpDir string, headerWriter *blockio.BlockWriter) BlockHashesCfg {
return BlockHashesCfg{
db: db,
tmpDir: tmpDir,
cc: cc,
headerWriter: headerWriter,
}
}
Expand All @@ -62,9 +56,6 @@ func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx cont
return nil
}

func UnwindBlockHashStage(u *UnwindState, tx kv.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) {
if err = u.Done(tx); err != nil {
return fmt.Errorf(" reset: %w", err)
}
return nil
func UnwindBlockHashStage(u *UnwindState, tx kv.RwTx) error {
return u.Done(tx)
}
17 changes: 6 additions & 11 deletions execution/stagedsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
const requestLoopCutOff int = 1

type BodiesCfg struct {
db kv.RwDB
bd *bodydownload.BodyDownload
bodyReqSend func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool)
penalise func(context.Context, []headerdownload.PenaltyItem)
Expand All @@ -52,15 +51,15 @@ type BodiesCfg struct {
blockWriter *blockio.BlockWriter
}

func StageBodiesCfg(db kv.RwDB, bd *bodydownload.BodyDownload,
func StageBodiesCfg(bd *bodydownload.BodyDownload,
bodyReqSend func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool), penalise func(context.Context, []headerdownload.PenaltyItem),
blockPropagator bodydownload.BlockPropagator, timeout int,
chanConfig *chain.Config,
blockReader services.FullBlockReader,
blockWriter *blockio.BlockWriter,
) BodiesCfg {
return BodiesCfg{
db: db, bd: bd, bodyReqSend: bodyReqSend, penalise: penalise, blockPropagator: blockPropagator,
bd: bd, bodyReqSend: bodyReqSend, penalise: penalise, blockPropagator: blockPropagator,
timeout: timeout, chanConfig: chanConfig, blockReader: blockReader,
blockWriter: blockWriter}
}
Expand Down Expand Up @@ -378,15 +377,11 @@ func logWritingBodies(logPrefix string, committed, headerProgress uint64, logger
)
}

func UnwindBodiesStage(u *UnwindState, tx kv.RwTx, cfg BodiesCfg, ctx context.Context) (err error) {
func UnwindBodiesStage(u *UnwindState, tx kv.RwTx, cfg BodiesCfg) error {
u.UnwindPoint = max(u.UnwindPoint, cfg.blockReader.FrozenBlocks()) // protect from unwind behind files
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
if err := cfg.blockWriter.MakeBodiesNonCanonical(tx, u.UnwindPoint+1); err != nil {
err := cfg.blockWriter.MakeBodiesNonCanonical(tx, u.UnwindPoint+1)
if err != nil {
return err
}
if err = u.Done(tx); err != nil {
return err
}
return nil
return u.Done(tx)
}
6 changes: 1 addition & 5 deletions execution/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,13 @@ import (
)

type FinishCfg struct {
db kv.RwDB
tmpDir string
forkValidator *engine_helpers.ForkValidator
initialCycleStart *time.Time
}

func StageFinishCfg(db kv.RwDB, tmpDir string, forkValidator *engine_helpers.ForkValidator) FinishCfg {
func StageFinishCfg(forkValidator *engine_helpers.ForkValidator) FinishCfg {
initialCycleStart := time.Now()
return FinishCfg{
db: db,
tmpDir: tmpDir,
forkValidator: forkValidator,
initialCycleStart: &initialCycleStart,
}
Expand Down
33 changes: 2 additions & 31 deletions execution/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,81 +25,52 @@ import (
"runtime"
"time"

"github.com/c2h5oh/datasize"

"github.com/erigontech/erigon/common"
"github.com/erigontech/erigon/common/dbg"
"github.com/erigontech/erigon/common/hexutil"
"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/db/kv"
"github.com/erigontech/erigon/db/rawdb"
"github.com/erigontech/erigon/db/rawdb/blockio"
"github.com/erigontech/erigon/db/services"
"github.com/erigontech/erigon/db/state/execctx"
"github.com/erigontech/erigon/diagnostics/diaglib"
"github.com/erigontech/erigon/execution/chain"
"github.com/erigontech/erigon/execution/rlp"
"github.com/erigontech/erigon/execution/stagedsync/bodydownload"
"github.com/erigontech/erigon/execution/stagedsync/headerdownload"
"github.com/erigontech/erigon/execution/types"
"github.com/erigontech/erigon/node/ethconfig"
"github.com/erigontech/erigon/node/shards"
)

// The number of blocks we should be able to re-org sub-second on commodity hardware.
// See https://hackmd.io/TdJtNs0dS56q-In8h-ShSg
const ShortPoSReorgThresholdBlocks = 10

type HeadersCfg struct {
db kv.RwDB
hd *headerdownload.HeaderDownload
bodyDownload *bodydownload.BodyDownload
chainConfig *chain.Config
headerReqSend func(context.Context, *headerdownload.HeaderRequest) ([64]byte, bool)
announceNewHashes func(context.Context, []headerdownload.Announce)
penalize func(context.Context, []headerdownload.PenaltyItem)
batchSize datasize.ByteSize
noP2PDiscovery bool
tmpdir string

blockReader services.FullBlockReader
blockWriter *blockio.BlockWriter
notifications *shards.Notifications

syncConfig ethconfig.Sync
blockReader services.FullBlockReader
syncConfig ethconfig.Sync
}

func StageHeadersCfg(
db kv.RwDB,
headerDownload *headerdownload.HeaderDownload,
bodyDownload *bodydownload.BodyDownload,
chainConfig *chain.Config,
syncConfig ethconfig.Sync,
headerReqSend func(context.Context, *headerdownload.HeaderRequest) ([64]byte, bool),
announceNewHashes func(context.Context, []headerdownload.Announce),
penalize func(context.Context, []headerdownload.PenaltyItem),
batchSize datasize.ByteSize,
noP2PDiscovery bool,
blockReader services.FullBlockReader,
blockWriter *blockio.BlockWriter,
tmpdir string,
notifications *shards.Notifications,
) HeadersCfg {
return HeadersCfg{
db: db,
hd: headerDownload,
bodyDownload: bodyDownload,
chainConfig: chainConfig,
syncConfig: syncConfig,
headerReqSend: headerReqSend,
announceNewHashes: announceNewHashes,
penalize: penalize,
batchSize: batchSize,
tmpdir: tmpdir,
noP2PDiscovery: noP2PDiscovery,
blockReader: blockReader,
blockWriter: blockWriter,
notifications: notifications,
}
}

Expand Down
6 changes: 0 additions & 6 deletions execution/stagedsync/stage_mining_create_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,30 +123,24 @@ func NewMiningState(cfg *buildercfg.MiningConfig) MiningState {
}

type MiningCreateBlockCfg struct {
db kv.RwDB
miner MiningState
chainConfig *chain.Config
engine rules.Engine
tmpdir string
blockBuilderParameters *builder.Parameters
blockReader services.FullBlockReader
}

func StageMiningCreateBlockCfg(
db kv.RwDB,
miner MiningState,
chainConfig *chain.Config,
engine rules.Engine,
blockBuilderParameters *builder.Parameters,
tmpdir string,
blockReader services.FullBlockReader,
) MiningCreateBlockCfg {
return MiningCreateBlockCfg{
db: db,
miner: miner,
chainConfig: chainConfig,
engine: engine,
tmpdir: tmpdir,
blockBuilderParameters: blockBuilderParameters,
blockReader: blockReader,
}
Expand Down
3 changes: 0 additions & 3 deletions execution/stagedsync/stage_mining_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
)

type MiningExecCfg struct {
db kv.RwDB
miningState MiningState
notifier ChainEventNotifier
chainConfig *chain.Config
Expand All @@ -64,7 +63,6 @@ type MiningExecCfg struct {
}

func StageMiningExecCfg(
db kv.RwDB,
miningState MiningState,
notifier ChainEventNotifier,
chainConfig *chain.Config,
Expand All @@ -77,7 +75,6 @@ func StageMiningExecCfg(
blockReader services.FullBlockReader,
) MiningExecCfg {
return MiningExecCfg{
db: db,
miningState: miningState,
notifier: notifier,
chainConfig: chainConfig,
Expand Down
Loading
Loading