Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f412673
eth: bug fix: fixed compatibility between wit/1 and wit/2 caused by u…
pratikspatil024 Nov 3, 2025
8a2eff7
Merge branch 'psp-pos-2947' of https://github.com/maticnetwork/bor in…
pratikspatil024 Nov 3, 2025
cc51c74
Merge branch 'psp-pos-2947' of https://github.com/maticnetwork/bor in…
pratikspatil024 Nov 6, 2025
4e4ad6b
Merge branch 'psp-pos-2947' of https://github.com/maticnetwork/bor in…
pratikspatil024 Nov 6, 2025
0d1392d
logs
pratikspatil024 Nov 10, 2025
cda5583
logs
pratikspatil024 Nov 10, 2025
fed119a
bug fix
pratikspatil024 Nov 10, 2025
f1923b4
Merge branch 'psp-pos-2947' of https://github.com/maticnetwork/bor in…
pratikspatil024 Nov 10, 2025
278f9ec
Merge branch 'psp-pos-2947' of https://github.com/maticnetwork/bor in…
pratikspatil024 Nov 10, 2025
ff57930
Merge branch 'psp-pos-2947' of https://github.com/maticnetwork/bor in…
pratikspatil024 Nov 13, 2025
e5fc923
logs
pratikspatil024 Nov 13, 2025
b2d9ec4
remove additional WriteCompactWitness
pratikspatil024 Nov 13, 2025
0110136
Merge branch 'psp-pos-2947' of https://github.com/maticnetwork/bor in…
pratikspatil024 Nov 13, 2025
8b610ed
Merge branch 'psp-pos-2947' of https://github.com/maticnetwork/bor in…
pratikspatil024 Nov 17, 2025
b121a98
eth: bug fix in import logic after restart
pratikspatil024 Nov 17, 2025
9511f37
eth: bug fix in import logic after restart (part 2)
pratikspatil024 Nov 17, 2025
fc17e61
core: bug fix in import logic after restart
pratikspatil024 Nov 17, 2025
6ffc379
eth: logs
pratikspatil024 Nov 17, 2025
d86fdf7
eth: bug fix in import logic after restart (part 3)
pratikspatil024 Nov 17, 2025
54049fc
eth: bug fix in import logic after restart (part 4)
pratikspatil024 Nov 17, 2025
3274bdc
eth: bug fix in import logic after restart (part 5)
pratikspatil024 Nov 17, 2025
69c8970
logs
pratikspatil024 Nov 18, 2025
7fdf276
eth: bug fix in import logic after restart (part 6)
pratikspatil024 Nov 18, 2025
340e43f
eth: bug fix in import logic after restart (part 7)
pratikspatil024 Nov 18, 2025
75bd0df
eth: optmisied by having usecompact flag per block
pratikspatil024 Nov 19, 2025
bbdd85c
eth/downloader: limit witness requests and import batch size when cac…
pratikspatil024 Nov 19, 2025
7ee61ec
eth/downloader: limiting witness requests when cache is cold
pratikspatil024 Nov 20, 2025
9b07ace
eth/downloader: bug fix
pratikspatil024 Nov 20, 2025
16c18b3
eth/downloader: logs for bug fix
pratikspatil024 Nov 20, 2025
5166974
eth: logs to debug
pratikspatil024 Nov 20, 2025
4c6aa1c
eth/downloader: improvements in reserve
pratikspatil024 Nov 20, 2025
765fe5b
eth/downloader: improvements in reserve (part 2)
pratikspatil024 Nov 20, 2025
344a264
eth/downloader: improvements in reserve (part 3)
pratikspatil024 Nov 20, 2025
2824012
eth/downloader: bug fix
pratikspatil024 Nov 20, 2025
591971d
eth/downloader: revert some logic to make sync work
pratikspatil024 Nov 20, 2025
68c31b4
refactor
pratikspatil024 Nov 20, 2025
0580874
cleanup logs
pratikspatil024 Nov 20, 2025
4997fa3
fix lint
pratikspatil024 Nov 21, 2025
4ae43ef
test
pratikspatil024 Nov 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 107 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4229,6 +4229,58 @@
return (blockNum / compactWitnessCacheWindowSize) * compactWitnessCacheWindowSize
}

// ShouldUseCompactWitness is a pure function that determines if a compact witness should be used
// for a given block number based on cache state and window configuration.
// This function is shared between eth and downloader packages to avoid code duplication.
//
// Parameters:
// - cacheWarm: Whether the cache has been warmed (e.g., window-start block processed)
// - parallel: Whether parallel stateless import is enabled (cache not maintained in parallel mode)
// - blockNum: The block number to check
// - windowSize: Size of the cache window (consensus constant)
// - overlapSize: Size of overlap between windows (consensus constant)
//
// Returns true if compact witness should be used, false for full witness.
func ShouldUseCompactWitness(cacheWarm bool, parallel bool, blockNum, windowSize, overlapSize uint64) bool {
// Compact witness requires sequential block processing.
if parallel {
return false
}
// If the compact cache hasn't been warmed yet (e.g. node restarted mid-window),
// request full witnesses until we process the next window start.
if !cacheWarm {
return false
}
// Without a valid window, fall back to full witnesses.
if windowSize == 0 {
return false
}

// Calculate the consensus-aligned window start for the requested block.
expectedWindowStart := (blockNum / windowSize) * windowSize
if blockNum == expectedWindowStart {
return false
}

// Calculate the first block of the overlap region for this window.
if overlapSize == 0 {
return true
}
var overlapStartOffset uint64
if overlapSize >= windowSize {
overlapStartOffset = 0
} else {
overlapStartOffset = windowSize - overlapSize
}
blockAtOverlapStart := expectedWindowStart + overlapStartOffset
if blockNum == blockAtOverlapStart {
return false
}

// Block is within window (not at window start or overlap start), can use compact witness.
return true
}

// manageSlidingWindow handles window sliding logic.
// Returns true if window was slid, false otherwise.
func (bc *BlockChain) manageSlidingWindow(blockNum uint64) bool {
Expand Down Expand Up @@ -4276,6 +4328,13 @@
return compactWitnessCacheOverlapSize
}

// GetCompactCacheSizes returns the current sizes of the active and next compact witness caches.
func (bc *BlockChain) GetCompactCacheSizes() (int, int) {
bc.cacheLock.RLock()
defer bc.cacheLock.RUnlock()
return len(bc.activeCacheMap), len(bc.nextCacheMap)
}

// CalculateCacheWindowStart returns the consensus-aligned window start for the provided block number.
func (bc *BlockChain) CalculateCacheWindowStart(blockNum uint64) uint64 {
return calculateCacheWindowStart(blockNum)
Expand All @@ -4284,6 +4343,13 @@
// IsCompactCacheWarm reports whether we've already refilled the cache for the
// current window by processing a full witness (after startup or the latest slide).
func (bc *BlockChain) IsCompactCacheWarm() bool {
bc.cacheLock.RLock()
defer bc.cacheLock.RUnlock()
// If cacheWindowStart is 0, the cache hasn't been initialized yet (e.g., after restart)
// In this case, the cache is definitely not warm
if bc.cacheWindowStart == 0 {
return false
}
return bc.compactCacheWarm.Load()
}

Expand Down Expand Up @@ -4348,6 +4414,8 @@

cachedToActive := 0
cachedToNext := 0
originalStateCount := len(originalWitnessStates)
executionNodesProcessed := 0

// Cache ORIGINAL witness state nodes (not merged states).
// This ensures we only cache new states for this block, not re-cache old states.
Expand Down Expand Up @@ -4378,6 +4446,7 @@
subset.ForEachWithOrder(func(path string, n *trienode.Node) {
if !n.IsDeleted() && n.Blob != nil {
stateNode := string(n.Blob)
executionNodesProcessed++
// Add to active map
if _, exists := bc.activeCacheMap[stateNode]; !exists {
bc.activeCacheMap[stateNode] = struct{}{}
Expand Down Expand Up @@ -4405,6 +4474,8 @@
"block", blockNum,
"windowStart", bc.cacheWindowStart,
"inOverlap", inOverlapPeriod,
"originalWitnessStates", originalStateCount,
"executionNodesProcessed", executionNodesProcessed,
"cachedToActive", cachedToActive,
"cachedToNext", cachedToNext,
"activeMapSize", len(bc.activeCacheMap),
Expand All @@ -4412,14 +4483,41 @@
}

// ProcessBlockWithWitnesses processes a block in stateless mode using the provided witnesses.
func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *stateless.Witness) (*state.StateDB, *ProcessResult, error) {

Check failure on line 4486 in core/blockchain.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 25 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZqhLImNWZpZkJwXvdUd&open=AZqhLImNWZpZkJwXvdUd&pullRequest=1902
if witness == nil {
return nil, nil, errors.New("nil witness")
}

blockNum := block.Number().Uint64()
parallelMode := bc.parallelStatelessImportEnabled.Load()

hasWitness := witness != nil
incomingStateCount := 0
if hasWitness {
incomingStateCount = len(witness.State)
}
var witnessBytes int
if hasWitness {
var buf bytes.Buffer
if err := witness.EncodeRLP(&buf); err == nil {
witnessBytes = buf.Len()
} else {
log.Warn("Failed to measure witness payload size", "block", blockNum, "err", err)
}
}

activeCacheSize, nextCacheSize := bc.GetCompactCacheSizes()

log.Debug("Processing block with witness",
"block", blockNum,
"parallelMode", parallelMode,
"hasWitness", hasWitness,
"incomingStates", incomingStateCount,
"payloadBytes", witnessBytes,
"activeCacheSize", activeCacheSize,
"nextCacheSize", nextCacheSize,
"windowStart", bc.GetCacheWindowStart())

if !hasWitness {
return nil, nil, errors.New("nil witness")
}

// Track original witness states before any merging (for cache update later)
// We only want to cache NEW states from this block, not re-cache merged states
var originalWitnessStates map[string]struct{}
Expand All @@ -4442,11 +4540,15 @@
witnessStatesBefore := len(witness.State)
mergedCount := bc.mergeSpanCacheIntoWitness(witness)
if mergedCount > 0 {
activeSize, nextSize := bc.GetCompactCacheSizes()
log.Debug("Merged cached states into witness",
"block", blockNum,
"witnessStatesBefore", witnessStatesBefore,
"mergedFromCache", mergedCount,
"witnessStatesAfter", len(witness.State))
"witnessStatesAfter", len(witness.State),
"activeCacheSize", activeSize,
"nextCacheSize", nextSize,
"windowStart", bc.GetCacheWindowStart())
}
}
}
Expand Down
71 changes: 67 additions & 4 deletions eth/downloader/bor_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,13 @@
// TrieDB retrieves the low level trie database used for interacting
// with trie nodes.
TrieDB() *triedb.Database

// Compact witness cache methods
IsCompactCacheWarm() bool
GetCacheWindowStart() uint64
GetCacheWindowSize() uint64
GetCacheOverlapSize() uint64
IsParallelStatelessImportEnabled() bool
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
Expand Down Expand Up @@ -2293,7 +2300,7 @@
}

// processFullSyncContentStateless takes fetch results from the queue and imports them into the chain using stateless mode.
func (d *Downloader) processFullSyncContentStateless() error {

Check failure on line 2303 in eth/downloader/bor_downloader.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 26 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZqhLIsaWZpZkJwXvdUe&open=AZqhLIsaWZpZkJwXvdUe&pullRequest=1902
for {
results := d.queue.Results(true)
if len(results) == 0 {
Expand All @@ -2312,10 +2319,66 @@
// Otherwise continue waiting for results
continue
}
// Process the retrieved block segments
if err := d.importBlockResultsStateless(results); err != nil {
log.Warn("Block import failed", "err", err)
return err

// Optimization: When cache is cold, limit import batch size to the next window-start block.
// This ensures that after a window-start block is processed and cache becomes warm,
// subsequent blocks can use compact witnesses instead of all being imported with full witnesses.
windowSize := d.blockchain.GetCacheWindowSize()
cacheWarm := d.blockchain.IsCompactCacheWarm()
maxBatchSize := maxResultsProcess

if !cacheWarm && windowSize > 0 && len(results) > 0 {
// Get the first block in the batch to calculate how many blocks until next window-start
firstBlockNum := results[0].Header.Number.Uint64()
currentHead := d.blockchain.CurrentBlock()

if currentHead != nil {
currentHeadNum := currentHead.Number.Uint64()
// Calculate the next window-start block
nextWindowStart := ((currentHeadNum / windowSize) + 1) * windowSize

// Limit batch to blocks up to (and including) the next window-start block
// This minimizes wasted full witnesses by switching to compact as soon as cache warms
blocksUntilWindowStart := nextWindowStart - firstBlockNum + 1 // +1 to include window-start

if blocksUntilWindowStart > 0 && blocksUntilWindowStart < uint64(maxBatchSize) {
maxBatchSize = int(blocksUntilWindowStart)
log.Debug("Limiting import batch size when cache is cold",
"originalBatchSize", len(results),
"limitedBatchSize", maxBatchSize,
"windowSize", windowSize,
"currentHead", currentHeadNum,
"firstBlockInBatch", firstBlockNum,
"nextWindowStart", nextWindowStart,
"blocksUntilWindowStart", blocksUntilWindowStart,
"reason", "cache cold - limiting to next window-start to allow cache to warm")
} else if uint64(maxBatchSize) > windowSize {
// Fallback: if calculation fails, use windowSize as conservative limit
maxBatchSize = int(windowSize)
log.Debug("Limiting import batch size when cache is cold (fallback)",
"originalBatchSize", len(results),
"limitedBatchSize", maxBatchSize,
"windowSize", windowSize,
"reason", "cache cold - limiting to windowSize to allow cache to warm")
}
}
}

// Split large batches into smaller ones if needed
for len(results) > 0 {
batchSize := len(results)
if batchSize > maxBatchSize {
batchSize = maxBatchSize
}

batch := results[:batchSize]
results = results[batchSize:]

// Process the retrieved block segments
if err := d.importBlockResultsStateless(batch); err != nil {
log.Warn("Block import failed", "err", err)
return err
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions eth/downloader/bor_downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ func (dlp *downloadTesterPeer) RequestWitnesses(hashes []common.Hash, sink chan
return nil, nil
}

// RequestWitnessesWithVerification implements Peer
func (dlp *downloadTesterPeer) RequestWitnessesWithVerification(hashes []common.Hash, sink chan *eth.Response, verifyPageCount func(common.Hash, uint64, string) bool, useCompact []bool, useCompactDefault bool) (*eth.Request, error) {
return nil, nil
}

// SupportsWitness implements Peer
func (dlp *downloadTesterPeer) SupportsWitness() bool {
return false
Expand Down
79 changes: 69 additions & 10 deletions eth/downloader/bor_fetchers_concurrent_witnesses.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,38 @@ import (

// Assuming witnesses are related to types
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/stateless"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
)

// shouldRequestCompactWitnessForBlock determines if we should request a compact witness
// for a given block number. This replicates the logic from eth/handler_eth.go but
// works with the downloader's BlockChain interface.
func shouldRequestCompactWitnessForBlock(d *Downloader, blockNum uint64) bool {
windowSize := d.blockchain.GetCacheWindowSize()
overlapSize := d.blockchain.GetCacheOverlapSize()
cacheWarm := d.blockchain.IsCompactCacheWarm()

// If the cache is cold (e.g., after restart), always request full witnesses.
// The cache must be warmed by processing a window-start block with a full witness
// before we can safely use compact witnesses.
// Note: We do NOT optimize by checking if currentHeadNum >= blockWindowStart because
// after a restart, the cache is lost even if the head is past the window start.
// We must rely solely on IsCompactCacheWarm() which correctly returns false
// when cacheWindowStart == 0 (uninitialized after restart).

// Use the shared logic from core package
return core.ShouldUseCompactWitness(
cacheWarm,
d.blockchain.IsParallelStatelessImportEnabled(),
blockNum,
windowSize,
overlapSize,
)
}

// witnessQueue implements typedQueue and is a type adapter between the generic
// concurrent fetcher and the downloader.
type witnessQueue Downloader
Expand Down Expand Up @@ -64,8 +91,12 @@ func (q *witnessQueue) updateCapacity(peer *peerConnection, items int, span time
// from the download queue to the specified peer.
// Note: This assumes a 'ReserveWitnesses' method exists or will be added to downloader.queue.
func (q *witnessQueue) reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool) {
// Assuming ReserveWitnesses returns *fetchRequest, bool, bool like ReserveBodies
return q.queue.ReserveWitnesses(peer, items) // Placeholder: Needs implementation in queue struct
// We don't limit requests here to avoid breaking the delivery mechanism.
// Instead, we rely on the import batch limiting in processFullSyncContentStateless to
// ensure we process a window-start block before importing too many blocks ahead.
// This is simpler and safer than modifying requests after they're reserved.
req, _, progress, throttle := q.queue.ReserveWitnesses(peer, items)
return req, progress, throttle
}

// unreserve is responsible for removing the current witness retrieval allocation
Expand Down Expand Up @@ -104,17 +135,45 @@ func (q *witnessQueue) request(peer *peerConnection, req *fetchRequest, resCh ch

peer.log.Trace("Requesting new batch of witnesses", "count", len(hashes), "from_hash", hashes[0])

// The implementation of Peer.RequestWitnesses (e.g., in eth/peer.go) is responsible
// for translating these hashes into appropriate wit protocol messages (e.g., GetWitnessRequest).
// This might involve grouping hashes or assuming protocol extensions.
return peer.peer.RequestWitnesses(hashes, resCh)
// Safety check: if cache is cold and block is too far ahead, log warning
// This is a fallback in case reserve() didn't catch it
d := (*Downloader)(q)
if !d.blockchain.IsCompactCacheWarm() && len(req.Headers) > 0 {
currentHead := d.blockchain.CurrentBlock()
if currentHead != nil {
windowSize := d.blockchain.GetCacheWindowSize()
if windowSize > 0 {
currentHeadNum := currentHead.Number.Uint64()
nextWindowStart := ((currentHeadNum / windowSize) + 1) * windowSize
firstBlockNum := req.Headers[0].Number.Uint64()

if firstBlockNum > nextWindowStart {
// Requesting witness for block ahead of next window-start when cache is cold
// This should have been limited by reserve()
}
}
}
}

// Determine useCompact for each block in the batch.
// This allows per-block determination of compact vs full witnesses within a single batch.
useCompact := make([]bool, len(req.Headers))
for i, header := range req.Headers {
blockNum := header.Number.Uint64()
useCompact[i] = shouldRequestCompactWitnessForBlock((*Downloader)(q), blockNum)
}

// Use RequestWitnessesWithVerification with per-block useCompact decisions.
// Note: We pass nil for verifyPageCount since the downloader doesn't have access to the handler's verification callback.
// We pass false as the fallback useCompactDefault (not used when useCompact slice is provided).
return peer.peer.RequestWitnessesWithVerification(hashes, resCh, nil, useCompact, false)
}

// deliver is responsible for taking a generic response packet from the concurrent
// fetcher, unpacking the witness data (using wit protocol definitions) and delivering
// it to the downloader's queue.
func (q *witnessQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
log.Trace("Delivering witness response", "peer", peer.id)
log.Trace("Delivering witness response", "peer", peer.id, "responseType", fmt.Sprintf("%T", packet.Res))
// Check the actual response type. Should be a pointer to WitnessPacketRLPPacket.
witPacketData, ok := packet.Res.([]*stateless.Witness) // Expect pointer type
if !ok {
Expand All @@ -133,11 +192,11 @@ func (q *witnessQueue) deliver(peer *peerConnection, packet *eth.Response) (int,

switch {
case err == nil && numWitnesses == 0:
peer.log.Trace("Requested witnesses delivered (empty batch)")
peer.log.Trace("Requested witnesses delivered (empty batch)", "peer", peer.id)
case err == nil:
peer.log.Trace("Delivered new batch of witnesses", "count", numWitnesses, "accepted", accepted)
peer.log.Trace("Delivered new batch of witnesses", "peer", peer.id, "count", numWitnesses, "accepted", accepted)
default:
peer.log.Debug("Failed to deliver retrieved witnesses", "err", err)
peer.log.Trace("Failed to deliver retrieved witnesses", "peer", peer.id, "err", err, "count", numWitnesses)
}

return accepted, err
Expand Down
1 change: 1 addition & 0 deletions eth/downloader/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Peer interface {
RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestWitnesses([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestWitnessesWithVerification([]common.Hash, chan *eth.Response, func(common.Hash, uint64, string) bool, []bool, bool) (*eth.Request, error)

// SupportsWitness returns true if the peer supports the witness protocol
SupportsWitness() bool
Expand Down
Loading
Loading