-
Notifications
You must be signed in to change notification settings - Fork 226
added commitInMemory method for synching optimization #7674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feat/supernova-async-exec
Are you sure you want to change the base?
Changes from 3 commits
b6ca39f
ac5b918
3d4fc43
72604f7
add1e7e
f090fe5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,15 @@ import ( | |
|
|
||
| const ( | ||
| cleanupHeadersDelta = 5 | ||
|
|
||
| // defaultSyncCommitInterval defines how many blocks to process before committing to disk during sync. | ||
| // Setting to 0 disables the optimization (commits every block). | ||
| // Higher values improve sync speed but increase memory usage and data loss risk on crash. | ||
| defaultSyncCommitInterval = uint64(10) | ||
|
|
||
| // syncThresholdNonces defines how many nonces behind the network the node must be | ||
| // to be considered "syncing" and use the commit interval optimization. | ||
| syncThresholdNonces = uint64(50) | ||
| ) | ||
|
|
||
| var log = logger.GetOrCreate("process/block") | ||
|
|
@@ -145,6 +154,11 @@ type baseProcessor struct { | |
| gasComputation process.GasComputation | ||
| executionManager process.ExecutionManager | ||
| txExecutionOrderHandler common.TxExecutionOrderHandler | ||
|
|
||
| // Sync commit optimization fields | ||
| syncCommitInterval uint64 | ||
| blocksSinceLastCommit uint64 | ||
| mutSyncCommit sync.Mutex | ||
| } | ||
|
|
||
| type bootStorerDataArgs struct { | ||
|
|
@@ -234,6 +248,7 @@ func NewBaseProcessor(arguments ArgBaseProcessor) (*baseProcessor, error) { | |
| gasComputation: arguments.GasComputation, | ||
| executionManager: arguments.ExecutionManager, | ||
| txExecutionOrderHandler: arguments.TxExecutionOrderHandler, | ||
| syncCommitInterval: defaultSyncCommitInterval, | ||
| } | ||
|
|
||
| err = base.OnExecutedBlock(genesisHdr, genesisHdr.GetRootHash()) | ||
|
|
@@ -2109,21 +2124,99 @@ func (bp *baseProcessor) RevertAccountsDBToSnapshot(accountsSnapshot map[state.A | |
|
|
||
| func (bp *baseProcessor) commitState(headerHandler data.HeaderHandler) error { | ||
| startTime := time.Now() | ||
| inMemory := true | ||
| defer func() { | ||
| elapsedTime := time.Since(startTime) | ||
| log.Debug("elapsed time to commit accounts state", | ||
| "time [s]", elapsedTime, | ||
| "header nonce", headerHandler.GetNonce(), | ||
| "in memory", inMemory, | ||
| ) | ||
| }() | ||
|
|
||
| if headerHandler.IsStartOfEpochBlock() { | ||
| bp.resetSyncCommitCounter() | ||
| return bp.commitInLastEpoch(headerHandler.GetEpoch()) | ||
| } | ||
|
|
||
| // Check if we should use sync commit optimization | ||
| if bp.shouldUseSyncCommitOptimization(headerHandler) { | ||
| return bp.commitInMemory() | ||
| } | ||
| inMemory = false | ||
| return bp.commit() | ||
| } | ||
|
|
||
| // shouldUseSyncCommitOptimization checks if the node is syncing and should use | ||
| // the in-memory commit optimization to improve sync speed. | ||
| func (bp *baseProcessor) shouldUseSyncCommitOptimization(headerHandler data.HeaderHandler) bool { | ||
| // Disabled if syncCommitInterval is 0 | ||
| if bp.syncCommitInterval == 0 { | ||
| return false | ||
| } | ||
|
Comment on lines
+2165
to
+2168
|
||
|
|
||
| // Check if node is syncing (far behind the network) | ||
| probableHighestNonce := bp.forkDetector.ProbableHighestNonce() | ||
| currentNonce := headerHandler.GetNonce() | ||
| noncesBehind := uint64(0) | ||
| if probableHighestNonce > currentNonce { | ||
| noncesBehind = probableHighestNonce - currentNonce | ||
| } | ||
|
|
||
| // Not syncing - commit every block | ||
| if noncesBehind < syncThresholdNonces { | ||
| bp.resetSyncCommitCounter() | ||
| return false | ||
| } | ||
|
|
||
| // Syncing - use commit interval | ||
| bp.mutSyncCommit.Lock() | ||
| defer bp.mutSyncCommit.Unlock() | ||
|
|
||
| bp.blocksSinceLastCommit++ | ||
|
|
||
| // Time for a full commit | ||
| if bp.blocksSinceLastCommit >= bp.syncCommitInterval { | ||
| bp.blocksSinceLastCommit = 0 | ||
| log.Debug("sync commit optimization: performing full commit", | ||
| "nonces_behind", noncesBehind, | ||
| "interval", bp.syncCommitInterval) | ||
| return false | ||
| } | ||
|
|
||
| log.Debug("sync commit optimization: using in-memory commit", | ||
| "nonces_behind", noncesBehind, | ||
| "blocks_since_commit", bp.blocksSinceLastCommit) | ||
| return true | ||
| } | ||
|
|
||
| func (bp *baseProcessor) resetSyncCommitCounter() { | ||
| bp.mutSyncCommit.Lock() | ||
| bp.blocksSinceLastCommit = 0 | ||
| bp.mutSyncCommit.Unlock() | ||
| } | ||
|
|
||
| func (bp *baseProcessor) commitInMemory() error { | ||
| for key := range bp.accountsDB { | ||
| _, err := bp.accountsDB[key].CommitInMemory() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // SetSyncCommitInterval sets the commit interval for sync optimization. | ||
| // Set to 0 to disable the optimization (commit every block). | ||
| // Higher values improve sync speed but increase memory usage and data loss risk on crash. | ||
| func (bp *baseProcessor) SetSyncCommitInterval(interval uint64) { | ||
| bp.mutSyncCommit.Lock() | ||
| bp.syncCommitInterval = interval | ||
| bp.mutSyncCommit.Unlock() | ||
| log.Debug("sync commit interval updated", "interval", interval) | ||
| } | ||
|
|
||
| func (bp *baseProcessor) commitInLastEpoch(currentEpoch uint32) error { | ||
| lastEpoch := uint32(0) | ||
| if currentEpoch > 0 { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -5549,3 +5549,208 @@ func TestBaseProcessor_excludeRevertedExecutionResultsForHeader(t *testing.T) { | |||||
| require.Equal(t, pendingExecutionResults, sanitizedPendingExecResults) | ||||||
| }) | ||||||
| } | ||||||
|
|
||||||
| // ------- Sync Commit Optimization Tests | ||||||
|
|
||||||
| func TestBaseProcessor_ShouldUseSyncCommitOptimization_DisabledWhenIntervalZero(t *testing.T) { | ||||||
| t.Parallel() | ||||||
|
|
||||||
| coreComponents, dataComponents, bootstrapComponents, statusComponents := createComponentHolderMocks() | ||||||
| arguments := createArgBaseProcessor(coreComponents, dataComponents, bootstrapComponents, statusComponents) | ||||||
| arguments.ForkDetector = &mock.ForkDetectorMock{ | ||||||
| ProbableHighestNonceCalled: func() uint64 { | ||||||
| return 1000 // Far behind | ||||||
| }, | ||||||
| } | ||||||
|
|
||||||
| sp, err := blproc.NewShardProcessor(blproc.ArgShardProcessor{ArgBaseProcessor: arguments}) | ||||||
| require.NoError(t, err) | ||||||
|
|
||||||
| // Disable the optimization | ||||||
| sp.SetSyncCommitIntervalForTest(0) | ||||||
|
|
||||||
| header := &block.Header{ | ||||||
| Nonce: 100, // Far behind network | ||||||
| } | ||||||
|
|
||||||
| // Should return false because interval is 0 | ||||||
| result := sp.ShouldUseSyncCommitOptimization(header) | ||||||
| assert.False(t, result, "should return false when sync commit interval is 0") | ||||||
| } | ||||||
|
|
||||||
| func TestBaseProcessor_ShouldUseSyncCommitOptimization_DisabledWhenNotSyncing(t *testing.T) { | ||||||
| t.Parallel() | ||||||
|
|
||||||
| coreComponents, dataComponents, bootstrapComponents, statusComponents := createComponentHolderMocks() | ||||||
| arguments := createArgBaseProcessor(coreComponents, dataComponents, bootstrapComponents, statusComponents) | ||||||
| arguments.ForkDetector = &mock.ForkDetectorMock{ | ||||||
| ProbableHighestNonceCalled: func() uint64 { | ||||||
| return 105 // Only 5 ahead | ||||||
| }, | ||||||
| } | ||||||
|
|
||||||
| sp, err := blproc.NewShardProcessor(blproc.ArgShardProcessor{ArgBaseProcessor: arguments}) | ||||||
| require.NoError(t, err) | ||||||
|
|
||||||
| sp.SetSyncCommitIntervalForTest(10) | ||||||
|
|
||||||
| header := &block.Header{ | ||||||
| Nonce: 100, // Not far behind network (less than syncThresholdNonces) | ||||||
| } | ||||||
|
|
||||||
| // Should return false because node is not syncing | ||||||
| result := sp.ShouldUseSyncCommitOptimization(header) | ||||||
| assert.False(t, result, "should return false when not syncing (nonces behind < threshold)") | ||||||
| } | ||||||
|
|
||||||
| func TestBaseProcessor_ShouldUseSyncCommitOptimization_UsesInMemoryWhenSyncing(t *testing.T) { | ||||||
| t.Parallel() | ||||||
|
|
||||||
| coreComponents, dataComponents, bootstrapComponents, statusComponents := createComponentHolderMocks() | ||||||
| arguments := createArgBaseProcessor(coreComponents, dataComponents, bootstrapComponents, statusComponents) | ||||||
| arguments.ForkDetector = &mock.ForkDetectorMock{ | ||||||
| ProbableHighestNonceCalled: func() uint64 { | ||||||
| return 200 // Far ahead (100 nonces behind) | ||||||
| }, | ||||||
| } | ||||||
|
|
||||||
| sp, err := blproc.NewShardProcessor(blproc.ArgShardProcessor{ArgBaseProcessor: arguments}) | ||||||
| require.NoError(t, err) | ||||||
|
|
||||||
| sp.SetSyncCommitIntervalForTest(10) | ||||||
|
|
||||||
| header := &block.Header{ | ||||||
| Nonce: 100, | ||||||
| } | ||||||
|
|
||||||
| // First call should return true (in-memory commit) | ||||||
| result := sp.ShouldUseSyncCommitOptimization(header) | ||||||
| assert.True(t, result, "should return true for first block when syncing") | ||||||
| assert.Equal(t, uint64(1), sp.GetBlocksSinceLastCommit()) | ||||||
| } | ||||||
|
|
||||||
| func TestBaseProcessor_ShouldUseSyncCommitOptimization_FullCommitAtInterval(t *testing.T) { | ||||||
| t.Parallel() | ||||||
|
|
||||||
| coreComponents, dataComponents, bootstrapComponents, statusComponents := createComponentHolderMocks() | ||||||
| arguments := createArgBaseProcessor(coreComponents, dataComponents, bootstrapComponents, statusComponents) | ||||||
| arguments.ForkDetector = &mock.ForkDetectorMock{ | ||||||
| ProbableHighestNonceCalled: func() uint64 { | ||||||
| return 200 // Far ahead | ||||||
| }, | ||||||
| } | ||||||
|
|
||||||
| sp, err := blproc.NewShardProcessor(blproc.ArgShardProcessor{ArgBaseProcessor: arguments}) | ||||||
| require.NoError(t, err) | ||||||
|
|
||||||
| sp.SetSyncCommitIntervalForTest(5) | ||||||
|
|
||||||
| header := &block.Header{ | ||||||
| Nonce: 100, | ||||||
| } | ||||||
|
|
||||||
| // Call 5 times - first 4 should return true (in-memory), 5th should return false (full commit) | ||||||
| for i := 0; i < 4; i++ { | ||||||
| result := sp.ShouldUseSyncCommitOptimization(header) | ||||||
| assert.True(t, result, "should return true for block %d", i+1) | ||||||
|
||||||
| assert.True(t, result, "should return true for block %d", i+1) | |
| assert.Truef(t, result, "should return true for block %d", i+1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the start-of-epoch path, the deferred log will still print
in memory=truebecauseinMemoryis initialized to true and never set to false before returningcommitInLastEpoch(). This makes the log line misleading (epoch commits are persisted). SetinMemory=falsebefore the early return, or initialize it to false and only set true on the in-memory path.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed