diff --git a/core/blockchain.go b/core/blockchain.go index 8cf433770d..0edce1c818 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -4229,6 +4229,58 @@ func calculateCacheWindowStart(blockNum uint64) uint64 { return (blockNum / compactWitnessCacheWindowSize) * compactWitnessCacheWindowSize } +// ShouldUseCompactWitness is a pure function that determines if a compact witness should be used +// for a given block number based on cache state and window configuration. +// This function is shared between eth and downloader packages to avoid code duplication. +// +// Parameters: +// - cacheWarm: Whether the cache has been warmed (e.g., window-start block processed) +// - parallel: Whether parallel stateless import is enabled (cache not maintained in parallel mode) +// - blockNum: The block number to check +// - windowSize: Size of the cache window (consensus constant) +// - overlapSize: Size of overlap between windows (consensus constant) +// +// Returns true if compact witness should be used, false for full witness. +func ShouldUseCompactWitness(cacheWarm bool, parallel bool, blockNum, windowSize, overlapSize uint64) bool { + // Compact witness requires sequential block processing. + if parallel { + return false + } + // If the compact cache hasn't been warmed yet (e.g. node restarted mid-window), + // request full witnesses until we process the next window start. + if !cacheWarm { + return false + } + // Without a valid window, fall back to full witnesses. + if windowSize == 0 { + return false + } + + // Calculate the consensus-aligned window start for the requested block. + expectedWindowStart := (blockNum / windowSize) * windowSize + if blockNum == expectedWindowStart { + return false + } + + // Calculate the first block of the overlap region for this window. + if overlapSize == 0 { + return true + } + var overlapStartOffset uint64 + if overlapSize >= windowSize { + overlapStartOffset = 0 + } else { + overlapStartOffset = windowSize - overlapSize + } + blockAtOverlapStart := expectedWindowStart + overlapStartOffset + if blockNum == blockAtOverlapStart { + return false + } + + // Block is within window (not at window start or overlap start), can use compact witness. + return true +} + // manageSlidingWindow handles window sliding logic. // Returns true if window was slid, false otherwise. func (bc *BlockChain) manageSlidingWindow(blockNum uint64) bool { @@ -4276,6 +4328,13 @@ func (bc *BlockChain) GetCacheOverlapSize() uint64 { return compactWitnessCacheOverlapSize } +// GetCompactCacheSizes returns the current sizes of the active and next compact witness caches. +func (bc *BlockChain) GetCompactCacheSizes() (int, int) { + bc.cacheLock.RLock() + defer bc.cacheLock.RUnlock() + return len(bc.activeCacheMap), len(bc.nextCacheMap) +} + // CalculateCacheWindowStart returns the consensus-aligned window start for the provided block number. func (bc *BlockChain) CalculateCacheWindowStart(blockNum uint64) uint64 { return calculateCacheWindowStart(blockNum) @@ -4284,6 +4343,13 @@ func (bc *BlockChain) CalculateCacheWindowStart(blockNum uint64) uint64 { // IsCompactCacheWarm reports whether we've already refilled the cache for the // current window by processing a full witness (after startup or the latest slide). func (bc *BlockChain) IsCompactCacheWarm() bool { + bc.cacheLock.RLock() + defer bc.cacheLock.RUnlock() + // If cacheWindowStart is 0, the cache hasn't been initialized yet (e.g., after restart) + // In this case, the cache is definitely not warm + if bc.cacheWindowStart == 0 { + return false + } return bc.compactCacheWarm.Load() } @@ -4348,6 +4414,8 @@ func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, originalWitnessS cachedToActive := 0 cachedToNext := 0 + originalStateCount := len(originalWitnessStates) + executionNodesProcessed := 0 // Cache ORIGINAL witness state nodes (not merged states). // This ensures we only cache new states for this block, not re-cache old states. @@ -4378,6 +4446,7 @@ func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, originalWitnessS subset.ForEachWithOrder(func(path string, n *trienode.Node) { if !n.IsDeleted() && n.Blob != nil { stateNode := string(n.Blob) + executionNodesProcessed++ // Add to active map if _, exists := bc.activeCacheMap[stateNode]; !exists { bc.activeCacheMap[stateNode] = struct{}{} @@ -4405,6 +4474,8 @@ func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, originalWitnessS "block", blockNum, "windowStart", bc.cacheWindowStart, "inOverlap", inOverlapPeriod, + "originalWitnessStates", originalStateCount, + "executionNodesProcessed", executionNodesProcessed, "cachedToActive", cachedToActive, "cachedToNext", cachedToNext, "activeMapSize", len(bc.activeCacheMap), @@ -4413,13 +4484,40 @@ func (bc *BlockChain) updateSlidingWindowCache(blockNum uint64, originalWitnessS // ProcessBlockWithWitnesses processes a block in stateless mode using the provided witnesses. func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *stateless.Witness) (*state.StateDB, *ProcessResult, error) { - if witness == nil { - return nil, nil, errors.New("nil witness") - } - blockNum := block.Number().Uint64() parallelMode := bc.parallelStatelessImportEnabled.Load() + hasWitness := witness != nil + incomingStateCount := 0 + if hasWitness { + incomingStateCount = len(witness.State) + } + var witnessBytes int + if hasWitness { + var buf bytes.Buffer + if err := witness.EncodeRLP(&buf); err == nil { + witnessBytes = buf.Len() + } else { + log.Warn("Failed to measure witness payload size", "block", blockNum, "err", err) + } + } + + activeCacheSize, nextCacheSize := bc.GetCompactCacheSizes() + + log.Debug("Processing block with witness", + "block", blockNum, + "parallelMode", parallelMode, + "hasWitness", hasWitness, + "incomingStates", incomingStateCount, + "payloadBytes", witnessBytes, + "activeCacheSize", activeCacheSize, + "nextCacheSize", nextCacheSize, + "windowStart", bc.GetCacheWindowStart()) + + if !hasWitness { + return nil, nil, errors.New("nil witness") + } + // Track original witness states before any merging (for cache update later) // We only want to cache NEW states from this block, not re-cache merged states var originalWitnessStates map[string]struct{} @@ -4442,11 +4540,15 @@ func (bc *BlockChain) ProcessBlockWithWitnesses(block *types.Block, witness *sta witnessStatesBefore := len(witness.State) mergedCount := bc.mergeSpanCacheIntoWitness(witness) if mergedCount > 0 { + activeSize, nextSize := bc.GetCompactCacheSizes() log.Debug("Merged cached states into witness", "block", blockNum, "witnessStatesBefore", witnessStatesBefore, "mergedFromCache", mergedCount, - "witnessStatesAfter", len(witness.State)) + "witnessStatesAfter", len(witness.State), + "activeCacheSize", activeSize, + "nextCacheSize", nextSize, + "windowStart", bc.GetCacheWindowStart()) } } } diff --git a/eth/downloader/bor_downloader.go b/eth/downloader/bor_downloader.go index bc617f559d..587dac2875 100644 --- a/eth/downloader/bor_downloader.go +++ b/eth/downloader/bor_downloader.go @@ -244,6 +244,13 @@ type BlockChain interface { // TrieDB retrieves the low level trie database used for interacting // with trie nodes. TrieDB() *triedb.Database + + // Compact witness cache methods + IsCompactCacheWarm() bool + GetCacheWindowStart() uint64 + GetCacheWindowSize() uint64 + GetCacheOverlapSize() uint64 + IsParallelStatelessImportEnabled() bool } // New creates a new downloader to fetch hashes and blocks from remote peers. @@ -2312,10 +2319,66 @@ func (d *Downloader) processFullSyncContentStateless() error { // Otherwise continue waiting for results continue } - // Process the retrieved block segments - if err := d.importBlockResultsStateless(results); err != nil { - log.Warn("Block import failed", "err", err) - return err + + // Optimization: When cache is cold, limit import batch size to the next window-start block. + // This ensures that after a window-start block is processed and cache becomes warm, + // subsequent blocks can use compact witnesses instead of all being imported with full witnesses. + windowSize := d.blockchain.GetCacheWindowSize() + cacheWarm := d.blockchain.IsCompactCacheWarm() + maxBatchSize := maxResultsProcess + + if !cacheWarm && windowSize > 0 && len(results) > 0 { + // Get the first block in the batch to calculate how many blocks until next window-start + firstBlockNum := results[0].Header.Number.Uint64() + currentHead := d.blockchain.CurrentBlock() + + if currentHead != nil { + currentHeadNum := currentHead.Number.Uint64() + // Calculate the next window-start block + nextWindowStart := ((currentHeadNum / windowSize) + 1) * windowSize + + // Limit batch to blocks up to (and including) the next window-start block + // This minimizes wasted full witnesses by switching to compact as soon as cache warms + blocksUntilWindowStart := nextWindowStart - firstBlockNum + 1 // +1 to include window-start + + if blocksUntilWindowStart > 0 && blocksUntilWindowStart < uint64(maxBatchSize) { + maxBatchSize = int(blocksUntilWindowStart) + log.Debug("Limiting import batch size when cache is cold", + "originalBatchSize", len(results), + "limitedBatchSize", maxBatchSize, + "windowSize", windowSize, + "currentHead", currentHeadNum, + "firstBlockInBatch", firstBlockNum, + "nextWindowStart", nextWindowStart, + "blocksUntilWindowStart", blocksUntilWindowStart, + "reason", "cache cold - limiting to next window-start to allow cache to warm") + } else if uint64(maxBatchSize) > windowSize { + // Fallback: if calculation fails, use windowSize as conservative limit + maxBatchSize = int(windowSize) + log.Debug("Limiting import batch size when cache is cold (fallback)", + "originalBatchSize", len(results), + "limitedBatchSize", maxBatchSize, + "windowSize", windowSize, + "reason", "cache cold - limiting to windowSize to allow cache to warm") + } + } + } + + // Split large batches into smaller ones if needed + for len(results) > 0 { + batchSize := len(results) + if batchSize > maxBatchSize { + batchSize = maxBatchSize + } + + batch := results[:batchSize] + results = results[batchSize:] + + // Process the retrieved block segments + if err := d.importBlockResultsStateless(batch); err != nil { + log.Warn("Block import failed", "err", err) + return err + } } } } diff --git a/eth/downloader/bor_downloader_test.go b/eth/downloader/bor_downloader_test.go index 5745c402b9..1f0720ae38 100644 --- a/eth/downloader/bor_downloader_test.go +++ b/eth/downloader/bor_downloader_test.go @@ -372,6 +372,11 @@ func (dlp *downloadTesterPeer) RequestWitnesses(hashes []common.Hash, sink chan return nil, nil } +// RequestWitnessesWithVerification implements Peer +func (dlp *downloadTesterPeer) RequestWitnessesWithVerification(hashes []common.Hash, sink chan *eth.Response, verifyPageCount func(common.Hash, uint64, string) bool, useCompact []bool, useCompactDefault bool) (*eth.Request, error) { + return nil, nil +} + // SupportsWitness implements Peer func (dlp *downloadTesterPeer) SupportsWitness() bool { return false diff --git a/eth/downloader/bor_fetchers_concurrent_witnesses.go b/eth/downloader/bor_fetchers_concurrent_witnesses.go index e6d9995976..2979c1fee4 100644 --- a/eth/downloader/bor_fetchers_concurrent_witnesses.go +++ b/eth/downloader/bor_fetchers_concurrent_witnesses.go @@ -23,11 +23,38 @@ import ( // Assuming witnesses are related to types "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/stateless" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" ) +// shouldRequestCompactWitnessForBlock determines if we should request a compact witness +// for a given block number. This replicates the logic from eth/handler_eth.go but +// works with the downloader's BlockChain interface. +func shouldRequestCompactWitnessForBlock(d *Downloader, blockNum uint64) bool { + windowSize := d.blockchain.GetCacheWindowSize() + overlapSize := d.blockchain.GetCacheOverlapSize() + cacheWarm := d.blockchain.IsCompactCacheWarm() + + // If the cache is cold (e.g., after restart), always request full witnesses. + // The cache must be warmed by processing a window-start block with a full witness + // before we can safely use compact witnesses. + // Note: We do NOT optimize by checking if currentHeadNum >= blockWindowStart because + // after a restart, the cache is lost even if the head is past the window start. + // We must rely solely on IsCompactCacheWarm() which correctly returns false + // when cacheWindowStart == 0 (uninitialized after restart). + + // Use the shared logic from core package + return core.ShouldUseCompactWitness( + cacheWarm, + d.blockchain.IsParallelStatelessImportEnabled(), + blockNum, + windowSize, + overlapSize, + ) +} + // witnessQueue implements typedQueue and is a type adapter between the generic // concurrent fetcher and the downloader. type witnessQueue Downloader @@ -64,8 +91,12 @@ func (q *witnessQueue) updateCapacity(peer *peerConnection, items int, span time // from the download queue to the specified peer. // Note: This assumes a 'ReserveWitnesses' method exists or will be added to downloader.queue. func (q *witnessQueue) reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool) { - // Assuming ReserveWitnesses returns *fetchRequest, bool, bool like ReserveBodies - return q.queue.ReserveWitnesses(peer, items) // Placeholder: Needs implementation in queue struct + // We don't limit requests here to avoid breaking the delivery mechanism. + // Instead, we rely on the import batch limiting in processFullSyncContentStateless to + // ensure we process a window-start block before importing too many blocks ahead. + // This is simpler and safer than modifying requests after they're reserved. + req, _, progress, throttle := q.queue.ReserveWitnesses(peer, items) + return req, progress, throttle } // unreserve is responsible for removing the current witness retrieval allocation @@ -104,17 +135,45 @@ func (q *witnessQueue) request(peer *peerConnection, req *fetchRequest, resCh ch peer.log.Trace("Requesting new batch of witnesses", "count", len(hashes), "from_hash", hashes[0]) - // The implementation of Peer.RequestWitnesses (e.g., in eth/peer.go) is responsible - // for translating these hashes into appropriate wit protocol messages (e.g., GetWitnessRequest). - // This might involve grouping hashes or assuming protocol extensions. - return peer.peer.RequestWitnesses(hashes, resCh) + // Safety check: if cache is cold and block is too far ahead, log warning + // This is a fallback in case reserve() didn't catch it + d := (*Downloader)(q) + if !d.blockchain.IsCompactCacheWarm() && len(req.Headers) > 0 { + currentHead := d.blockchain.CurrentBlock() + if currentHead != nil { + windowSize := d.blockchain.GetCacheWindowSize() + if windowSize > 0 { + currentHeadNum := currentHead.Number.Uint64() + nextWindowStart := ((currentHeadNum / windowSize) + 1) * windowSize + firstBlockNum := req.Headers[0].Number.Uint64() + + if firstBlockNum > nextWindowStart { + // Requesting witness for block ahead of next window-start when cache is cold + // This should have been limited by reserve() + } + } + } + } + + // Determine useCompact for each block in the batch. + // This allows per-block determination of compact vs full witnesses within a single batch. + useCompact := make([]bool, len(req.Headers)) + for i, header := range req.Headers { + blockNum := header.Number.Uint64() + useCompact[i] = shouldRequestCompactWitnessForBlock((*Downloader)(q), blockNum) + } + + // Use RequestWitnessesWithVerification with per-block useCompact decisions. + // Note: We pass nil for verifyPageCount since the downloader doesn't have access to the handler's verification callback. + // We pass false as the fallback useCompactDefault (not used when useCompact slice is provided). + return peer.peer.RequestWitnessesWithVerification(hashes, resCh, nil, useCompact, false) } // deliver is responsible for taking a generic response packet from the concurrent // fetcher, unpacking the witness data (using wit protocol definitions) and delivering // it to the downloader's queue. func (q *witnessQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { - log.Trace("Delivering witness response", "peer", peer.id) + log.Trace("Delivering witness response", "peer", peer.id, "responseType", fmt.Sprintf("%T", packet.Res)) // Check the actual response type. Should be a pointer to WitnessPacketRLPPacket. witPacketData, ok := packet.Res.([]*stateless.Witness) // Expect pointer type if !ok { @@ -133,11 +192,11 @@ func (q *witnessQueue) deliver(peer *peerConnection, packet *eth.Response) (int, switch { case err == nil && numWitnesses == 0: - peer.log.Trace("Requested witnesses delivered (empty batch)") + peer.log.Trace("Requested witnesses delivered (empty batch)", "peer", peer.id) case err == nil: - peer.log.Trace("Delivered new batch of witnesses", "count", numWitnesses, "accepted", accepted) + peer.log.Trace("Delivered new batch of witnesses", "peer", peer.id, "count", numWitnesses, "accepted", accepted) default: - peer.log.Debug("Failed to deliver retrieved witnesses", "err", err) + peer.log.Trace("Failed to deliver retrieved witnesses", "peer", peer.id, "err", err, "count", numWitnesses) } return accepted, err diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 15a6a59760..93defa195c 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -65,6 +65,7 @@ type Peer interface { RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error) RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error) RequestWitnesses([]common.Hash, chan *eth.Response) (*eth.Request, error) + RequestWitnessesWithVerification([]common.Hash, chan *eth.Response, func(common.Hash, uint64, string) bool, []bool, bool) (*eth.Request, error) // SupportsWitness returns true if the peer supports the witness protocol SupportsWitness() bool diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 75a02a53b0..548c1c1534 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -576,12 +576,40 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo // ReserveWitnesses reserves a set of witness fetches for the given peer, skipping // any previously failed downloads. Beside the next batch of needed fetches, it -// also returns a flag indicating progress and whether the caller should throttle. -func (q *queue) ReserveWitnesses(p *peerConnection, count int) (*fetchRequest, bool, bool) { +// also returns the first block number in the reserved batch (if any) for optimization checks, +// a flag indicating progress and whether the caller should throttle. +func (q *queue) ReserveWitnesses(p *peerConnection, count int) (*fetchRequest, uint64, bool, bool) { q.lock.Lock() defer q.lock.Unlock() - return q.reserveHeaders(p, count, q.witnessTaskPool, q.witnessTaskQueue, q.witnessPendPool, witnessType) + req, progress, throttle := q.reserveHeaders(p, count, q.witnessTaskPool, q.witnessTaskQueue, q.witnessPendPool, witnessType) + + var firstBlockNum uint64 + if req != nil && len(req.Headers) > 0 { + firstBlockNum = req.Headers[0].Number.Uint64() + } + + return req, firstBlockNum, progress, throttle +} + +// ReturnWitnessHeaders returns a slice of headers to the witnessTaskQueue. +// This is used when a larger batch was reserved for peeking, but only a subset +// could be processed due to cold cache or window limits. +// If peerID is provided and removeFromPendPool is true, it also removes the request from pendPool. +// Note: The caller should have already trimmed the request in pendPool before calling this. +func (q *queue) ReturnWitnessHeaders(headers []*types.Header, peerID string, removeFromPendPool bool) { + q.lock.Lock() + defer q.lock.Unlock() + + // If requested, remove the entire request from pendPool (used when returning nil from reserve) + if removeFromPendPool && peerID != "" { + delete(q.witnessPendPool, peerID) + } + + // Return headers to the queue + for _, header := range headers { + q.witnessTaskQueue.Push(header, -int64(header.Number.Uint64())) + } } // reserveHeaders reserves a set of data download operations for a given peer, @@ -1058,15 +1086,11 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, getReceip // DeliverWitnesses injects a witness retrieval response into the results cache. func (q *queue) DeliverWitnesses(id string, witnesses []*stateless.Witness, meta interface{}) (int, error) { - log.Debug("DeliverWitnesses: Entered", "peer", id) - defer log.Debug("DeliverWitnesses: Exiting", "peer", id) + log.Debug("DeliverWitnesses entered", "peer", id, "witnessCount", len(witnesses)) + defer log.Debug("DeliverWitnesses exiting", "peer", id) q.lock.Lock() - log.Trace("DeliverWitnesses: Acquired lock", "peer", id) - defer func() { - log.Trace("DeliverWitnesses: Releasing lock", "peer", id) - q.lock.Unlock() - }() + defer q.lock.Unlock() // Ensure the peer has a pending request request := q.witnessPendPool[id] @@ -1076,7 +1100,7 @@ func (q *queue) DeliverWitnesses(id string, witnesses []*stateless.Witness, meta return 0, errNoFetchesPending } - log.Debug("DeliverWitnesses: Received witnesses", "peer", id, "count", len(witnesses)) + log.Debug("DeliverWitnesses: Found pending request", "peer", id, "count", len(witnesses), "reqHeaders", len(request.Headers)) // Mark incoming data and time witnessInMeter.Mark(int64(len(witnesses))) @@ -1101,7 +1125,7 @@ func (q *queue) DeliverWitnesses(id string, witnesses []*stateless.Witness, meta } // Call the generic deliver mechanism - log.Debug("DeliverWitnesses: Calling generic deliver", "peer", id, "reqHeaders", len(request.Headers)) + log.Debug("DeliverWitnesses: Calling generic deliver", "peer", id, "reqHeaders", len(request.Headers), "witnessCount", len(witnesses)) acceptedCount, err := q.deliver(id, q.witnessTaskPool, q.witnessTaskQueue, q.witnessPendPool, witnessReqTimer, witnessInMeter, witnessDropMeter, len(witnesses), // Pass the count of received RLP items diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index d7c4bc2581..1b59e04820 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -215,6 +215,10 @@ func (p *skeletonTestPeer) RequestWitnesses([]common.Hash, chan *eth.Response) ( panic("skeleton sync must not request witnesses") } +func (p *skeletonTestPeer) RequestWitnessesWithVerification([]common.Hash, chan *eth.Response, func(common.Hash, uint64, string) bool, []bool, bool) (*eth.Request, error) { + panic("skeleton sync must not request witnesses") +} + func (p *skeletonTestPeer) SupportsWitness() bool { return false } diff --git a/eth/fetcher/witness_manager.go b/eth/fetcher/witness_manager.go index 1edab0b6e1..44d70dff1a 100644 --- a/eth/fetcher/witness_manager.go +++ b/eth/fetcher/witness_manager.go @@ -514,10 +514,6 @@ func (m *witnessManager) tick() { func (m *witnessManager) fetchWitness(peer string, hash common.Hash, announce *blockAnnounce) { resCh := make(chan *eth.Response) - m.mu.Lock() - announcedAt := announce.time // Capture the original 'ready-to-fetch' time for logging/timestamping - m.mu.Unlock() - witnessFetchMeter.Mark(1) req, err := announce.fetchWitness(hash, resCh) @@ -585,7 +581,7 @@ func (m *witnessManager) fetchWitness(peer string, hash common.Hash, announce *b } // Process successful fetch - m.handleWitnessFetchSuccess(peer, hash, witness[0], announcedAt) + m.handleWitnessFetchSuccess(peer, hash, witness[0], announce.time) case <-timeout.C: log.Info("[wm] Witness fetch timed out for peer", "peer", peer, "hash", hash) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 7d7e47f080..0614ce1278 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -135,7 +135,7 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, useCompact := h.shouldRequestCompactWitness(number) // Request witnesses (compact or full) using the wit peer with verification - return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount, useCompact) + return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount, []bool{useCompact}, false) } h.blockFetcher.Notify(peer.ID(), hash, number, time.Now(), peer.RequestOneHeader, peer.RequestBodies, witnessRequester) @@ -156,67 +156,23 @@ func (h *ethHandler) shouldRequestCompactWitness(blockNum uint64) bool { overlapSize := h.chain.GetCacheOverlapSize() cacheWarm := h.chain.IsCompactCacheWarm() - // Optimization: If cache is cold but we're past the window-start block, - // check if the window-start block has already been processed. - // This handles the case where blocks are queued before the window-start is processed. - if !cacheWarm && windowSize > 0 { - expectedWindowStart := (blockNum / windowSize) * windowSize - if blockNum > expectedWindowStart { - // Check if the window-start block has been processed - if header := h.chain.GetHeaderByNumber(expectedWindowStart); header != nil { - // Window-start block exists, cache should be warm for this window - cacheWarm = true - } - } - } + // If the cache is cold (e.g., after restart), always request full witnesses. + // The cache must be warmed by processing a window-start block with a full witness + // before we can safely use compact witnesses. + // Note: We do NOT optimize by checking if currentHeadNum >= blockWindowStart because + // after a restart, the cache is lost even if the head is past the window start. + // We must rely solely on IsCompactCacheWarm() which correctly returns false + // when cacheWindowStart == 0 (uninitialized after restart). - return shouldUseCompactWitness( + result := core.ShouldUseCompactWitness( cacheWarm, h.chain.IsParallelStatelessImportEnabled(), blockNum, windowSize, overlapSize, ) -} - -func shouldUseCompactWitness(cacheWarm bool, parallel bool, blockNum, windowSize, overlapSize uint64) bool { - // Compact witness requires sequential block processing. - if parallel { - return false - } - // If the compact cache hasn't been warmed yet (e.g. node restarted mid-window), - // request full witnesses until we process the next window start. - if !cacheWarm { - return false - } - // Without a valid window, fall back to full witnesses. - if windowSize == 0 { - return false - } - - // Calculate the consensus-aligned window start for the requested block. - expectedWindowStart := (blockNum / windowSize) * windowSize - if blockNum == expectedWindowStart { - return false - } - - // Calculate the first block of the overlap region for this window. - if overlapSize == 0 { - return true - } - var overlapStartOffset uint64 - if overlapSize >= windowSize { - overlapStartOffset = 0 - } else { - overlapStartOffset = windowSize - overlapSize - } - blockAtOverlapStart := expectedWindowStart + overlapStartOffset - if blockNum == blockAtOverlapStart { - return false - } - // Block is within window (not at window start or overlap start), can use compact witness. - return true + return result } // verifyPageCount verifies the witness page count for a given block hash by @@ -284,7 +240,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td } // Request witnesses (compact or full) using the wit peer with verification - return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount, useCompact) + return ethPeer.RequestWitnessesWithVerification([]common.Hash{hash}, sink, h.verifyPageCount, []bool{useCompact}, false) } // Call the new fetcher method to inject the block diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 34dafa24b8..5a0d4309b2 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -785,7 +785,7 @@ func TestShouldUseCompactWitness(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := shouldUseCompactWitness(tt.cacheWarm, tt.parallel, tt.blockNum, tt.windowSize, tt.overlapSize) + got := core.ShouldUseCompactWitness(tt.cacheWarm, tt.parallel, tt.blockNum, tt.windowSize, tt.overlapSize) require.Equal(t, tt.expectCompact, got) }) } diff --git a/eth/handler_wit.go b/eth/handler_wit.go index 7e7d0936f0..3e09d1f6f1 100644 --- a/eth/handler_wit.go +++ b/eth/handler_wit.go @@ -133,7 +133,14 @@ func (h *witHandler) handleWitnessHashesAnnounce(peer *wit.Peer, hashes []common // It now returns the data and error, rather than sending the reply directly. // The returned data is [][]byte, as rlp.RawValue is essentially []byte. func (h *witHandler) handleGetWitness(peer *wit.Peer, req *wit.GetWitnessPacket) (wit.WitnessPacketResponse, error) { - log.Debug("handleGetWitness processing request", "peer", peer.ID(), "reqID", req.RequestId, "witnessPages", len(req.WitnessPages)) + activeSize, nextSize := h.chain.GetCompactCacheSizes() + log.Debug("handleGetWitness processing request", + "peer", peer.ID(), + "reqID", req.RequestId, + "witnessPages", len(req.WitnessPages), + "activeCacheSize", activeSize, + "nextCacheSize", nextSize, + "windowStart", h.chain.GetCacheWindowStart()) // list different witnesses to query seen := make(map[common.Hash]struct{}, len(req.WitnessPages)) for _, witnessPage := range req.WitnessPages { @@ -198,14 +205,30 @@ func (h *witHandler) handleGetWitness(peer *wit.Peer, req *wit.GetWitnessPacket) } // Return the collected RLP data - log.Debug("handleGetWitness returning witnesses pages", "peer", peer.ID(), "reqID", req.RequestId, "count", len(response)) + activeSizeEnd, nextSizeEnd := h.chain.GetCompactCacheSizes() + log.Debug("handleGetWitness returning witnesses pages", + "peer", peer.ID(), + "reqID", req.RequestId, + "count", len(response), + "totalPayloadBytes", totalResponsePayloadDataAmount, + "activeCacheSize", activeSizeEnd, + "nextCacheSize", nextSizeEnd, + "windowStart", h.chain.GetCacheWindowStart()) return response, nil } // handleGetCompactWitness retrieves witnesses and filters them using the sliding window cache. // This reduces bandwidth by omitting state nodes that the receiver should already have cached. func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitnessPacket) (wit.WitnessPacketResponse, error) { - log.Debug("handleGetCompactWitness processing request", "peer", peer.ID(), "reqID", req.RequestId, "witnessPages", len(req.WitnessPages)) + activeSize, nextSize := h.chain.GetCompactCacheSizes() + currentWindowStart := h.chain.GetCacheWindowStart() + log.Debug("handleGetCompactWitness processing request", + "peer", peer.ID(), + "reqID", req.RequestId, + "witnessPages", len(req.WitnessPages), + "activeCacheSize", activeSize, + "nextCacheSize", nextSize, + "windowStart", currentWindowStart) // First, get the full witness data (same as regular handleGetWitness) fullResponse, err := h.handleGetWitness(peer, req) @@ -218,7 +241,8 @@ func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitness if h.chain.IsParallelStatelessImportEnabled() { log.Debug("Parallel import enabled, returning full witness instead of compact", "peer", peer.ID(), - "reqID", req.RequestId) + "reqID", req.RequestId, + "reason", "parallel-stateless-import") return fullResponse, nil } @@ -250,6 +274,10 @@ func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitness blockNumbers[hash] = header.Number.Uint64() } if windowStart, data := rawdb.ReadCompactWitness(h.chain.DB(), hash); len(data) > 0 { + log.Debug("Loaded compact witness from disk", + "hash", hash, + "windowStart", windowStart, + "bytes", len(data)) compactDisk[hash] = &compactDiskEntry{ windowStart: windowStart, data: data, @@ -262,6 +290,8 @@ func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitness // Now filter each witness page by removing cached states var filteredResponse wit.WitnessPacketResponse + totalOriginalBytes := 0 + totalFilteredBytes := 0 for _, witnessPage := range fullResponse { var witnessPageResponse wit.WitnessPageResponse @@ -276,6 +306,10 @@ func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitness continue } + originalSize := len(witnessPage.Data) + totalOriginalBytes += originalSize + + // Check cache first blockNum, haveBlockNum := blockNumbers[witnessPage.Hash] var expectedWindowStart uint64 if haveBlockNum { @@ -326,16 +360,20 @@ func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitness if totalResponsePayloadDataAmount >= MaximumResponseSize { return nil, errors.New("response exceeds maximum p2p payload size") } + filteredSize := len(cachedFiltered) + totalFilteredBytes += filteredSize continue } // No stored compact witness found (neither on disk nor in memory cache). // Return full witness instead of filtering on-the-fly to avoid inconsistencies. // On-the-fly filtering might produce different results than what was stored during import. - log.Debug("PSP - No stored compact witness found, returning full witness", + filteredSize := originalSize // Full witness, no filtering + totalFilteredBytes += filteredSize + log.Debug("No stored compact witness found, returning full witness", "hash", witnessPage.Hash, - "blockNum", blockNum, - "windowStart", expectedWindowStart) + "page", witnessPage.Page, + "originalSize", originalSize) filteredResponse = append(filteredResponse, witnessPage) totalResponsePayloadDataAmount += len(witnessPage.Data) if totalResponsePayloadDataAmount >= MaximumResponseSize { @@ -347,24 +385,6 @@ func (h *witHandler) handleGetCompactWitness(peer *wit.Peer, req *wit.GetWitness return filteredResponse, nil } -// filterWitnessWithCache filters a witness by removing state nodes present in the sliding window cache. -func (h *witHandler) filterWitnessWithCache(witness *stateless.Witness) *stateless.Witness { - if witness == nil { - return nil - } - - // Use BlockChain's exported method to filter - filtered, originalStates, removed := h.chain.FilterWitnessWithSlidingCache(witness) - - log.Debug("Filtered witness state using cache", - "blockNum", witness.Header().Number, - "originalStates", originalStates, - "filteredStates", len(filtered.State), - "removed", removed) - - return filtered -} - // ClearStaleCompactWitnessCache removes cache entries that are no longer valid // due to sliding window movement. Should be called when cache window slides. func (h *witHandler) ClearStaleCompactWitnessCache(currentWindowStart uint64) { diff --git a/eth/peer.go b/eth/peer.go index 06551b2906..a580e202c3 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -176,7 +176,7 @@ func (p *ethPeer) SupportsWitness() bool { // RequestWitnesses implements downloader.Peer. // It requests witnesses using the wit protocol for the given block hashes. func (p *ethPeer) RequestWitnesses(hashes []common.Hash, dlResCh chan *eth.Response) (*eth.Request, error) { - return p.RequestWitnessesWithVerification(hashes, dlResCh, nil, false) // Default to full witness + return p.RequestWitnessesWithVerification(hashes, dlResCh, nil, nil, false) // Default to full witness } // RequestWitnessPageCount requests only the page count for a witness using the new metadata protocol. @@ -288,8 +288,10 @@ func (p *ethPeer) requestWitnessPageCountLegacy(hash common.Hash) (uint64, error } } -// RequestWitnessesWithVerification requests witnesses with optional page count verification -func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh chan *eth.Response, verifyPageCount func(common.Hash, uint64, string) bool, useCompact bool) (*eth.Request, error) { +// RequestWitnessesWithVerification requests witnesses with optional page count verification. +// useCompact is a slice where useCompact[i] determines whether to request compact witness for hashes[i]. +// If useCompact is nil or shorter than hashes, the fallback useCompactDefault is used for missing entries. +func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh chan *eth.Response, verifyPageCount func(common.Hash, uint64, string) bool, useCompact []bool, useCompactDefault bool) (*eth.Request, error) { if p.witPeer == nil { return nil, errors.New("witness peer not found") } @@ -307,7 +309,7 @@ func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh var buildRequestMu sync.RWMutex // Build all the initial requests synchronously. - buildWitReqErr := p.buildWitnessRequests(hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, useCompact) + buildWitReqErr := p.buildWitnessRequests(hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, useCompact, useCompactDefault) if buildWitReqErr != nil { p.witPeer.Peer.Log().Error("Error in building witness requests", "peer", p.ID(), "err", buildWitReqErr) return nil, buildWitReqErr @@ -341,7 +343,7 @@ func (p *ethPeer) RequestWitnessesWithVerification(hashes []common.Hash, dlResCh reconstructedWitness := make(map[common.Hash]*stateless.Witness) var lastWitRes *wit.Response for witRes := range witReqResCh { - p.receiveWitnessPage(witRes, receivedWitPages, reconstructedWitness, hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, downloadPaused, verifyPageCount, useCompact) + p.receiveWitnessPage(witRes, receivedWitPages, reconstructedWitness, hashes, &witReqs, &witReqsWg, witTotalPages, witTotalRequest, witReqResCh, witReqSem, &mapsMu, &buildRequestMu, failedRequests, downloadPaused, verifyPageCount, useCompact, useCompactDefault) <-witReqSem // Check if the Response is nil before accessing the Done channel. @@ -447,7 +449,8 @@ func (p *ethPeer) receiveWitnessPage( failedRequests map[common.Hash]map[uint64]witReqRetryCount, downloadPaused map[common.Hash]bool, verifyPageCount func(common.Hash, uint64, string) bool, - useCompact bool, + useCompact []bool, + useCompactDefault bool, ) (retrievedError error) { defer func() { // if fails map on retry count and request again @@ -467,7 +470,7 @@ func (p *ethPeer) receiveWitnessPage( // non blocking call to avoid race condition because of semaphore witReqsWg.Add(1) // protecting from not finishing before requests are built go func() { - buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests, useCompact) + buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests, useCompact, useCompactDefault) if buildWitReqErr != nil { p.witPeer.Peer.Log().Error("Error in building witness requests", "peer", p.ID(), "err", buildWitReqErr) } @@ -567,7 +570,7 @@ func (p *ethPeer) receiveWitnessPage( // non blocking call to avoid race condition because of semaphore witReqsWg.Add(1) // protecting from not finishing before requests are built go func() { - buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests, useCompact) + buildWitReqErr := p.buildWitnessRequests(hashes, witReqs, witReqsWg, witTotalPages, witTotalRequest, witResCh, witReqSem, mapsMu, buildRequestMu, failedRequests, useCompact, useCompactDefault) if buildWitReqErr != nil { p.witPeer.Peer.Log().Error("Error in building witness requests", "peer", p.ID(), "err", buildWitReqErr) } @@ -606,11 +609,28 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, mapsMu *sync.RWMutex, buildRequestMu *sync.RWMutex, failedRequests map[common.Hash]map[uint64]witReqRetryCount, - useCompact bool, + useCompact []bool, + useCompactDefault bool, ) error { buildRequestMu.Lock() defer buildRequestMu.Unlock() + // Build hashes list for logging + hashesList := make([]string, 0, len(hashes)) + for _, h := range hashes { + hashesList = append(hashesList, h.Hex()[:16]) + } + + // Build useCompact map from slice, using default for missing entries + useCompactMap := make(map[common.Hash]bool, len(hashes)) + for i, hash := range hashes { + if i < len(useCompact) { + useCompactMap[hash] = useCompact[i] + } else { + useCompactMap[hash] = useCompactDefault + } + } + //checking requests to be done for _, hash := range hashes { mapsMu.RLock() @@ -621,6 +641,7 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, end = DefaultPagesRequestPerWitness } + hashUseCompact := useCompactMap[hash] for page := start; page < end; page++ { if err := p.doWitnessRequest( hash, @@ -631,7 +652,7 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, witReqSem, mapsMu, witTotalRequest, - useCompact, + hashUseCompact, ); err != nil { return err } @@ -640,6 +661,7 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, // checking failed requests to retry for hash, _ := range failedRequests { + hashUseCompact := useCompactMap[hash] for page, _ := range failedRequests[hash] { retryCount := failedRequests[hash][page] if retryCount.ShouldRetryAgain { @@ -652,7 +674,7 @@ func (p *ethPeer) buildWitnessRequests(hashes []common.Hash, witReqSem, mapsMu, witTotalRequest, - useCompact, + hashUseCompact, ); err != nil { return err }