diff --git a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go index 5b40a20415..5c1671011c 100644 --- a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go +++ b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go @@ -1722,6 +1722,172 @@ func (stp *SubtreeProcessor) processCompleteSubtree(skipNotification bool) (err return nil } +// bulkBuildSubtrees builds subtrees from a flat node slice using parallel construction. +// It fills any existing currentSubtree first, then builds additional full subtrees in parallel, +// and places any remaining nodes into a new currentSubtree. +// +// This replaces per-node addNode calls during reorg operations, eliminating mutex overhead +// from AddSubtreeNode and avoiding processCompleteSubtree channel sends for temporary subtrees. +// +// Parameters: +// - nodes: Flat slice of non-coinbase transaction nodes to build into subtrees +// - subtreeSize: Target number of nodes per subtree (including coinbase for first) +func (stp *SubtreeProcessor) bulkBuildSubtrees(nodes []subtreepkg.Node, subtreeSize int) error { + if len(nodes) == 0 { + return nil + } + + nodeIdx := 0 + + // Phase 1: Fill existing currentSubtree (which may already have coinbase + some nodes) + currentSt := stp.currentSubtree.Load() + if currentSt != nil { + currentLen := len(currentSt.Nodes) + remaining := subtreeSize - currentLen + if remaining > 0 { + fillCount := min(remaining, len(nodes)) + for _, node := range nodes[:fillCount] { + if err := currentSt.AddNode(node.Hash, node.Fee, node.SizeInBytes); err != nil { + return err + } + } + nodeIdx = fillCount + } + + if currentSt.IsComplete() { + // Move completed subtree to chainedSubtrees + stp.chainedSubtrees = append(stp.chainedSubtrees, currentSt) + currentSt = nil + } + } + + remainingNodes := nodes[nodeIdx:] + if len(remainingNodes) == 0 { + // All nodes fit in the existing currentSubtree + stp.updateChainedSubtreeCounts() + return nil + } + + // Phase 2: Build full subtrees in parallel + numFullSubtrees := len(remainingNodes) / subtreeSize + lastCount := len(remainingNodes) % subtreeSize + + if numFullSubtrees > 0 { + fullSubtrees := make([]*subtreepkg.Subtree, numFullSubtrees) + g, _ := errgroup.WithContext(context.Background()) + + for i := 0; i < numFullSubtrees; i++ { + i := i + start := i * subtreeSize + end := start + subtreeSize + chunk := remainingNodes[start:end] + + g.Go(func() error { + st, err := subtreepkg.NewTreeByLeafCount(subtreeSize) + if err != nil { + return err + } + + for _, node := range chunk { + if err := st.AddNode(node.Hash, node.Fee, node.SizeInBytes); err != nil { + return err + } + } + + fullSubtrees[i] = st + return nil + }) + } + + if err := g.Wait(); err != nil { + return err + } + + stp.chainedSubtrees = append(stp.chainedSubtrees, fullSubtrees...) + } + + // Phase 3: Handle last partial subtree (becomes new currentSubtree) + lastStart := numFullSubtrees * subtreeSize + if lastCount > 0 { + st, err := subtreepkg.NewTreeByLeafCount(subtreeSize) + if err != nil { + return err + } + + for _, node := range remainingNodes[lastStart:] { + if err := st.AddNode(node.Hash, node.Fee, node.SizeInBytes); err != nil { + return err + } + } + + stp.currentSubtree.Store(st) + } else if currentSt == nil { + // All nodes consumed into full subtrees, create empty currentSubtree + st, err := subtreepkg.NewTreeByLeafCount(subtreeSize) + if err != nil { + return err + } + stp.currentSubtree.Store(st) + } + + stp.updateChainedSubtreeCounts() + return nil +} + +// updateChainedSubtreeCounts recalculates chainedSubtreeCount and chainedSubtreesTotalSize +// from the current chainedSubtrees slice. Used after bulk operations that modify chainedSubtrees directly. +func (stp *SubtreeProcessor) updateChainedSubtreeCounts() { + stp.chainedSubtreeCount.Store(int32(len(stp.chainedSubtrees))) + var totalSize uint64 + for _, st := range stp.chainedSubtrees { + totalSize += st.SizeInBytes + } + stp.chainedSubtreesTotalSize.Store(totalSize) +} + +// finalizeBulkBuildSubtrees handles post-bulk-build bookkeeping for newly created subtrees. +// It updates the subtree node counts ring buffer and sends each subtree to newSubtreeChan. +// This should be called after bulkBuildSubtrees when subtrees need to be persisted/announced. +// +// Parameters: +// - startIdx: index in chainedSubtrees where new subtrees start +// - skipNotification: whether to skip network announcement +func (stp *SubtreeProcessor) finalizeBulkBuildSubtrees(startIdx int, skipNotification bool) { + for i := startIdx; i < len(stp.chainedSubtrees); i++ { + st := stp.chainedSubtrees[i] + + // Update ring buffer for subtree size auto-tuning + actualNodeCount := len(st.Nodes) + if actualNodeCount > 0 { + stp.subtreeNodeCounts.Value = actualNodeCount + stp.subtreeNodeCounts = stp.subtreeNodeCounts.Next() + } + + stp.subtreesInBlock++ + + // Send to newSubtreeChan for persistence + errCh := make(chan error) + stp.newSubtreeChan <- NewSubtreeRequest{ + Subtree: st, + ParentTxMap: stp.currentTxMap, + SkipNotification: skipNotification, + ErrChan: errCh, + } + + go func(hash string) { + if err := <-errCh; err != nil { + stp.logger.Errorf("[%s] error sending subtree to newSubtreeChan: %v", hash, err) + } + }(st.RootHash().String()) + } + + if !skipNotification && startIdx < len(stp.chainedSubtrees) { + stp.resetAnnouncementTicker() + } + + stp.updatePrecomputedMiningData() +} + // AddBatch adds a batch of transaction nodes to the processor queue. // // Parameters: @@ -2412,7 +2578,7 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* // movedBackBlockTxMap keeps track of all the transactions that were in the blocks we moved back // this is used to determine which transactions need to be marked as on the longest chain when moving forward // if a transaction was in a block we moved back, it means it was on the longest chain before the reorg - movedBackBlockTxMap := make(map[chainhash.Hash]struct{}) // keeps track of all the transactions that were in the blocks we moved back + movedBackBlockTxMap := make(map[chainhash.Hash]bool) // keeps track of all the transactions that were in the blocks we moved back for _, block := range moveBackBlocks { // move back the block, getting all the transactions in the block and any conflicting hashes @@ -2433,7 +2599,7 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* for _, subtreeNodes := range subtreesNodes { for _, node := range subtreeNodes { if !node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { - movedBackBlockTxMap[node.Hash] = struct{}{} + movedBackBlockTxMap[node.Hash] = true } } } @@ -2451,14 +2617,11 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* } var ( - transactionMap *SplitSwissMap - losingTxHashesMap txmap.TxMap - // winningTxSet and losingTxSet track tx membership for dedup/filtering - winningTxSet = make(map[chainhash.Hash]struct{}) - losingTxSet = make(map[chainhash.Hash]struct{}) - // markOnLongestChain collects hashes that need to be marked as on longest chain; - // filtered inline to avoid a second pass + transactionMap *SplitSwissMap + losingTxHashesMap txmap.TxMap markOnLongestChain = make([]chainhash.Hash, 0, 1024) + winningTxSet = make(map[chainhash.Hash]bool) + rawLosingTxHashes = make([]chainhash.Hash, 0, 1024) ) for blockIdx, block := range moveForwardBlocks { @@ -2472,8 +2635,8 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* if transactionMap != nil { transactionMap.Iter(func(hash chainhash.Hash, _ struct{}) bool { if !hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { - winningTxSet[hash] = struct{}{} - if _, inMovedBack := movedBackBlockTxMap[hash]; !inMovedBack { + winningTxSet[hash] = true + if !movedBackBlockTxMap[hash] { markOnLongestChain = append(markOnLongestChain, hash) } } @@ -2482,14 +2645,8 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* }) } - // Build losingTxSet directly from the map iterator, avoiding Keys() intermediate slice if losingTxHashesMap != nil && losingTxHashesMap.Length() > 0 { - losingTxHashesMap.Iter(func(hash chainhash.Hash, _ uint64) bool { - if _, isWinning := winningTxSet[hash]; !isWinning { - losingTxSet[hash] = struct{}{} - } - return true - }) + rawLosingTxHashes = append(rawLosingTxHashes, losingTxHashesMap.Keys()...) } stp.currentBlockHeader.Store(block.Header) @@ -2497,27 +2654,25 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* movedBackBlockTxMap = nil // free up memory - // Build allLosingTxHashes directly from losingTxSet (already deduped, already filtered vs winningTxSet) - allLosingTxHashes := getHashSlice(len(losingTxSet)) - for hash := range losingTxSet { - *allLosingTxHashes = append(*allLosingTxHashes, hash) - } - - // Filter markOnLongestChain against losingTxSet in-place to avoid a second slice allocation - n := 0 - for _, hash := range markOnLongestChain { - if _, isLosing := losingTxSet[hash]; !isLosing { - markOnLongestChain[n] = hash - n++ + losingTxSet := make(map[chainhash.Hash]bool) + allLosingTxHashes := make([]chainhash.Hash, 0, len(rawLosingTxHashes)) + for _, hash := range rawLosingTxHashes { + if winningTxSet[hash] { + continue + } + if losingTxSet[hash] { + continue } + losingTxSet[hash] = true + allLosingTxHashes = append(allLosingTxHashes, hash) } - filteredMarkOnLongestChain := markOnLongestChain[:n] - // all the transactions in markOnLongestChain need to be marked as on the longest chain in the utxo store - if len(filteredMarkOnLongestChain) > 0 { - if err = stp.utxoStore.MarkTransactionsOnLongestChain(ctx, filteredMarkOnLongestChain, true); err != nil { - return errors.NewProcessingError("[reorgBlocks] error marking transactions as on longest chain in utxo store", err) + filteredMarkOnLongestChain := make([]chainhash.Hash, 0, len(markOnLongestChain)) + for _, hash := range markOnLongestChain { + if losingTxSet[hash] { + continue } + filteredMarkOnLongestChain = append(filteredMarkOnLongestChain, hash) } // Consolidate all "mark as false" operations into a single call @@ -2528,43 +2683,54 @@ func (stp *SubtreeProcessor) reorgBlocks(ctx context.Context, moveBackBlocks []* } currentSubtree := stp.currentSubtree.Load() subtreeNodeCount += len(currentSubtree.Nodes) + allMarkFalse := make([]chainhash.Hash, 0, len(allLosingTxHashes)+subtreeNodeCount) - allMarkFalse := getHashSlice(len(*allLosingTxHashes) + subtreeNodeCount) + allMarkFalse = append(allMarkFalse, allLosingTxHashes...) - // Add losing conflicting transactions - *allMarkFalse = append(*allMarkFalse, *allLosingTxHashes...) - putHashSlice(allLosingTxHashes) - - // Add everything in block assembly (not mined on the longest chain) for _, subtree := range stp.chainedSubtrees { for _, node := range subtree.Nodes { if !node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { - *allMarkFalse = append(*allMarkFalse, node.Hash) + allMarkFalse = append(allMarkFalse, node.Hash) } } } - for _, node := range currentSubtree.Nodes { if !node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { - *allMarkFalse = append(*allMarkFalse, node.Hash) + allMarkFalse = append(allMarkFalse, node.Hash) } } - // Make one consolidated call instead of separate calls - if len(*allMarkFalse) > 0 { - if err = stp.markNotOnLongestChain(ctx, moveBackBlocks, moveForwardBlocks, *allMarkFalse); err != nil { - putHashSlice(allMarkFalse) - return err - } + // Run mark(true) and mark(false) concurrently — they operate on disjoint tx hash sets + markGroup, markCtx := errgroup.WithContext(ctx) + + if len(filteredMarkOnLongestChain) > 0 { + markGroup.Go(func() error { + if err := stp.utxoStore.MarkTransactionsOnLongestChain(markCtx, filteredMarkOnLongestChain, true); err != nil { + return errors.NewProcessingError("[reorgBlocks] error marking transactions as on longest chain in utxo store", err) + } + return nil + }) + } + + if len(allMarkFalse) > 0 { + markGroup.Go(func() error { + return stp.markNotOnLongestChain(markCtx, moveBackBlocks, moveForwardBlocks, allMarkFalse) + }) + } + + if err = markGroup.Wait(); err != nil { + return err } - putHashSlice(allMarkFalse) // announce all the subtrees to the network // this will also store it by the Server in the subtree store - for _, subtree := range stp.chainedSubtrees { - errCh := make(chan error) - stp.newSubtreeChan <- NewSubtreeRequest{Subtree: subtree, ParentTxMap: stp.currentTxMap, ErrChan: errCh} - + // Send all subtrees in a batch, then wait for all responses (overlaps send/receive) + errChs := make([]chan error, len(stp.chainedSubtrees)) + for i, subtree := range stp.chainedSubtrees { + errChs[i] = make(chan error, 1) + stp.newSubtreeChan <- NewSubtreeRequest{Subtree: subtree, ParentTxMap: stp.currentTxMap, ErrChan: errChs[i]} + } + for _, errCh := range errChs { if err = <-errCh; err != nil { return errors.NewProcessingError("[reorgBlocks] error sending subtree to newSubtreeChan", err) } @@ -2638,7 +2804,7 @@ func (stp *SubtreeProcessor) checkMarkNotOnLongestChain(ctx context.Context, inv txMetas := make([]*meta.Data, len(markNotOnLongestChain)) g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(stp.settings.UtxoStore.GetBatcherSize * 2) + g.SetLimit(max(stp.settings.UtxoStore.MaxMinedRoutines, stp.settings.UtxoStore.GetBatcherSize*2)) // we need to check each transaction in the block we moved back and see if it is still on the longest chain or not for idx, hash := range markNotOnLongestChain { @@ -2759,13 +2925,9 @@ func (stp *SubtreeProcessor) moveBackBlock(ctx context.Context, block *model.Blo } } - // create new subtrees and add all the transactions from the block to it - if subtreesNodes, conflictingHashes, err = stp.moveBackBlockCreateNewSubtrees(ctx, block, createProperlySizedSubtrees); err != nil { - return nil, nil, err - } - - // add all the transactions from the previous state - if err = stp.moveBackBlockAddPreviousNodes(ctx, block, chainedSubtrees, lastIncompleteSubtree); err != nil { + // Bulk build: get block subtrees, collect all nodes, and build subtrees in parallel + // Bulk build: collect all nodes, then build subtrees in parallel + if subtreesNodes, conflictingHashes, err = stp.moveBackBlockBulkBuild(ctx, block, createProperlySizedSubtrees, chainedSubtrees, lastIncompleteSubtree); err != nil { return nil, nil, err } @@ -2781,94 +2943,121 @@ func (stp *SubtreeProcessor) moveBackBlock(ctx context.Context, block *model.Blo return subtreesNodes, conflictingHashes, nil } -func (stp *SubtreeProcessor) moveBackBlockAddPreviousNodes(ctx context.Context, block *model.Block, chainedSubtrees []*subtreepkg.Subtree, lastIncompleteSubtree *subtreepkg.Subtree) error { - _, _, deferFn := tracing.Tracer("subtreeprocessor").Start(ctx, "moveBackBlock", - tracing.WithLogMessage(stp.logger, "[moveBackBlock:AddPreviousNodes][%s] with %d subtrees: add previous nodes to subtrees", block.String(), len(block.Subtrees)), +// moveBackBlockBulkBuild uses a single-pass bulk construction approach. Instead of calling +// addNode() per node (which acquires a mutex, checks IsComplete, and potentially calls +// processCompleteSubtree for each), this function: +// 1. Reads block subtrees from blob store (parallel, same as before) +// 2. Collects block nodes into a flat list, populating currentTxMap for new entries +// 3. Collects previous mempool nodes into the same flat list +// 4. Resets subtree state +// 5. Builds all subtrees in parallel using bulkBuildSubtrees +// +// This eliminates ~(N_block + N_mempool) mutex acquisitions per moveBack call. +func (stp *SubtreeProcessor) moveBackBlockBulkBuild(ctx context.Context, block *model.Block, + createProperlySizedSubtrees bool, + previousChainedSubtrees []*subtreepkg.Subtree, + previousCurrentSubtree *subtreepkg.Subtree) ([][]subtreepkg.Node, []chainhash.Hash, error) { + + _, _, deferFn := tracing.Tracer("subtreeprocessor").Start(ctx, "moveBackBlockBulkBuild", + tracing.WithLogMessage(stp.logger, "[moveBackBlock:BulkBuild][%s] with %d subtrees", block.String(), len(block.Subtrees)), ) defer deferFn() - // add all the transactions from the previous state - for _, subtree := range chainedSubtrees { - for _, node := range subtree.Nodes { - if node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { - // skip coinbase placeholder - continue - } + // Step 1: Get block subtrees from blob store (parallel) + subtreesNodes, subtreeMetaTxInpoints, conflictingHashes, err := stp.moveBackBlockGetSubtrees(ctx, block) + if err != nil { + return nil, nil, errors.NewProcessingError("[moveBackBlock:BulkBuild][%s] error getting subtrees", block.String(), err) + } - if err := stp.addNode(node, nil, true); err != nil { - return errors.NewProcessingError("[moveBackBlock:AddPreviousNodes][%s] error adding node to subtree", block.String(), err) - } - } + // Step 2: Estimate total node count for pre-allocation + totalBlockNodes := 0 + for _, nodes := range subtreesNodes { + totalBlockNodes += len(nodes) + } + totalPreviousNodes := 0 + for _, st := range previousChainedSubtrees { + totalPreviousNodes += len(st.Nodes) } + totalPreviousNodes += len(previousCurrentSubtree.Nodes) - // add all the transactions from the last incomplete subtree - for _, node := range lastIncompleteSubtree.Nodes { - if node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { - // skip coinbase placeholder - continue - } + allNodes := make([]subtreepkg.Node, 0, totalBlockNodes+totalPreviousNodes) - if err := stp.addNode(node, nil, true); err != nil { - return errors.NewProcessingError("[moveBackBlock:AddPreviousNodes][%s] error adding node to subtree", block.String(), err) - } + // Step 3: Collect block nodes and populate currentTxMap for new entries using bulk parallel insert. + // Flatten block nodes (skipping coinbase) into parallel arrays for ParallelBulkSetIfNotExists. + blockNodeCount := totalBlockNodes + if len(subtreesNodes) > 0 { + blockNodeCount-- // subtract coinbase from first subtree } - return nil -} + flatHashes := make([]chainhash.Hash, 0, blockNodeCount) + flatInpoints := make([]*subtreepkg.TxInpoints, 0, blockNodeCount) + flatNodes := make([]subtreepkg.Node, 0, blockNodeCount) -func (stp *SubtreeProcessor) moveBackBlockCreateNewSubtrees(ctx context.Context, block *model.Block, createProperlySizedSubtrees bool) ([][]subtreepkg.Node, []chainhash.Hash, error) { - _, _, deferFn := tracing.Tracer("subtreeprocessor").Start(ctx, "moveBackBlockCreateNewSubtrees", - tracing.WithLogMessage(stp.logger, "[moveBackBlock:CreateNewSubtrees][%s] with %d subtrees: create new subtrees", block.String(), len(block.Subtrees)), - ) - defer deferFn() + for idx, subtreeNodes := range subtreesNodes { + startIdx := 0 + if idx == 0 { + startIdx = 1 // skip coinbase in first subtree + } + for i := startIdx; i < len(subtreeNodes); i++ { + flatHashes = append(flatHashes, subtreeNodes[i].Hash) + flatInpoints = append(flatInpoints, &subtreeMetaTxInpoints[idx][i]) + flatNodes = append(flatNodes, subtreeNodes[i]) + } + } - // get all the subtrees in the block - subtreesNodes, subtreeMetaTxInpoints, conflictingHashes, err := stp.moveBackBlockGetSubtrees(ctx, block) - if err != nil { - return nil, nil, errors.NewProcessingError("[moveBackBlock:CreateNewSubtrees][%s] error getting subtrees", block.String(), err) + if len(flatHashes) > 0 { + wasSet := make([]bool, len(flatHashes)) + if splitMap, ok := stp.currentTxMap.(*SplitTxInpointsMap); ok { + splitMap.ParallelBulkSetIfNotExists(flatHashes, flatInpoints, wasSet) + } else { + for i, hash := range flatHashes { + _, wasSet[i] = stp.currentTxMap.SetIfNotExists(hash, flatInpoints[i]) + } + } + for i, set := range wasSet { + if set { + allNodes = append(allNodes, flatNodes[i]) + } + } + } + + // Step 4: Collect previous mempool nodes (already in currentTxMap) + for _, st := range previousChainedSubtrees { + for _, node := range st.Nodes { + if node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { + continue + } + allNodes = append(allNodes, node) + } + } + for _, node := range previousCurrentSubtree.Nodes { + if node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { + continue + } + allNodes = append(allNodes, node) } - // reset the subtree processor + // Step 5: Determine subtree size subtreeSize := int(stp.currentItemsPerFile.Load()) if !createProperlySizedSubtrees { - // if we are moving forward blocks, we do not care about the subtree size - // as we will create new subtrees anyway when moving forward so for simplicity and speed, - // we create as few subtrees as possible when moving back, to avoid fragmentation and lots of small writes to disk subtreeSize = 1024 * 1024 } + + // Step 6: Reset subtree state with coinbase in first subtree newSubtree, err := subtreepkg.NewTreeByLeafCount(subtreeSize) if err != nil { - return nil, nil, errors.NewProcessingError("[moveBackBlock:CreateNewSubtrees][%s] error creating new subtree", block.String(), err) + return nil, nil, errors.NewProcessingError("[moveBackBlock:BulkBuild][%s] error creating new subtree", block.String(), err) } + _ = newSubtree.AddCoinbaseNode() stp.currentSubtree.Store(newSubtree) stp.chainedSubtrees = make([]*subtreepkg.Subtree, 0, ExpectedNumberOfSubtrees) stp.chainedSubtreeCount.Store(0) stp.chainedSubtreesTotalSize.Store(0) - // add first coinbase placeholder transaction - _ = stp.currentSubtree.Load().AddCoinbaseNode() - - // run through the nodes of the subtrees in order and add to the new subtrees - if len(subtreesNodes) > 0 { - for idx, subtreeNodes := range subtreesNodes { - subtreeHash := block.Subtrees[idx] - - if idx == 0 { - // skip the first transaction of the first subtree (coinbase) - for i := 1; i < len(subtreeNodes); i++ { - if err = stp.addNode(subtreeNodes[i], &subtreeMetaTxInpoints[idx][i], true); err != nil { - return nil, nil, errors.NewProcessingError("[moveBackBlock:CreateNewSubtrees][%s][%s] error adding node to subtree", block.String(), subtreeHash.String(), err) - } - } - } else { - for i, node := range subtreeNodes { - if err = stp.addNode(node, &subtreeMetaTxInpoints[idx][i], true); err != nil { - return nil, nil, errors.NewProcessingError("[moveBackBlock:CreateNewSubtrees][%s][%s] error adding node to subtree", block.String(), subtreeHash.String(), err) - } - } - } - } + // Step 7: Bulk build subtrees from all collected nodes + if err := stp.bulkBuildSubtrees(allNodes, subtreeSize); err != nil { + return nil, nil, errors.NewProcessingError("[moveBackBlock:BulkBuild][%s] error bulk building subtrees", block.String(), err) } return subtreesNodes, conflictingHashes, nil @@ -3655,21 +3844,12 @@ func (stp *SubtreeProcessor) processRemainderTxHashes(ctx context.Context, chain return nil } - // Pack 3 boolean flags per element into a single byte array: - // bit 0 = existedInTxMap, bit 1 = existsInLosingMap, bit 2 = isRemoveMap - // Saves ~66% memory vs three separate []bool arrays - const ( - flagExistedInTxMap = 1 << 0 - flagExistsInLosingMap = 1 << 1 - flagIsRemoveMap = 1 << 2 - ) - nodeFlags := make([]byte, n) - - numWorkers := min(runtime.NumCPU(), n/100, 16) - if numWorkers < 2 { - numWorkers = 2 - } + // Pre-allocate result arrays indexed by position + existedInTxMap := make([]bool, n) // true if SetIfExists found the key + existsInLosingMap := make([]bool, n) // true if in losingTxHashesMap + isRemoveMap := make([]bool, n) // true if in removeMap (to delete) + numWorkers := max(min(runtime.NumCPU(), n/100, 16), 2) chunkSize := (n + numWorkers - 1) / numWorkers // Phase 1: Parallel SetIfExists + Exists lookups @@ -3691,16 +3871,16 @@ func (stp *SubtreeProcessor) processRemainderTxHashes(ctx context.Context, chain } if removeMapLength > 0 && stp.removeMap.Exists(node.Hash) { - nodeFlags[i] = flagIsRemoveMap + isRemoveMap[i] = true continue } // SetIfExists: atomic check + set (1 lock instead of 2) existed := transactionMap.Exists(node.Hash) - if existed { - nodeFlags[i] = flagExistedInTxMap - } else if losingTxHashesMap != nil && losingTxHashesMap.Exists(node.Hash) { - nodeFlags[i] = flagExistsInLosingMap + existedInTxMap[i] = existed + + if !existed && losingTxHashesMap != nil { + existsInLosingMap[i] = losingTxHashesMap.Exists(node.Hash) } } }(start, end) @@ -3714,13 +3894,12 @@ func (stp *SubtreeProcessor) processRemainderTxHashes(ctx context.Context, chain continue } - f := nodeFlags[i] - if f&flagIsRemoveMap != 0 { + if isRemoveMap[i] { _ = stp.removeMap.Delete(node.Hash) continue } - if f&(flagExistedInTxMap|flagExistsInLosingMap) == 0 { + if !existedInTxMap[i] && !existsInLosingMap[i] { remainderSubtrees[idx] = append(remainderSubtrees[idx], node) } } @@ -3740,43 +3919,59 @@ func (stp *SubtreeProcessor) processRemainderTxHashes(ctx context.Context, chain totalNodes += len(subtreeNodes) } + // Phase 2: Populate new currentTxMap and collect inserted nodes for bulk build parallelThreshold := stp.settings.BlockAssembly.ParallelSetIfNotExistsThreshold - if totalNodes >= parallelThreshold { - // Parallel path: flatten to single slice, process in parallel, then add sequentially - flatNodes := make([]subtreepkg.Node, 0, totalNodes) - for _, subtreeNodes := range remainderSubtrees { - flatNodes = append(flatNodes, subtreeNodes...) - } + // Track how many chainedSubtrees existed before we start building + previousChainedCount := len(stp.chainedSubtrees) + + // Flatten all remainder nodes preserving order + flatNodes := make([]subtreepkg.Node, 0, totalNodes) + for _, subtreeNodes := range remainderSubtrees { + flatNodes = append(flatNodes, subtreeNodes...) + } + var insertedNodes []subtreepkg.Node + + if totalNodes >= parallelThreshold { + // Parallel path: populate currentTxMap in parallel, then collect inserted nodes wasInserted := make([]bool, len(flatNodes)) - // Note: removeMapLength is 0 here because removeMap was already processed in Phase 1 if err := stp.parallelGetAndSetIfNotExists(flatNodes, currentTxMap, 0, stp.settings.BlockAssembly.ProcessRemainderTxHashesConcurrency, wasInserted); err != nil { return errors.NewProcessingError("[processRemainderTxHashes] parallel error", err) } + insertedNodes = make([]subtreepkg.Node, 0, totalNodes) for idx, node := range flatNodes { - if !wasInserted[idx] { - continue + if wasInserted[idx] { + insertedNodes = append(insertedNodes, node) } - _ = stp.addNodePreValidated(node, skipNotification) } } else { - // Sequential path for small remainder sets (existing behavior) - for _, subtreeNodes := range remainderSubtrees { - for _, node := range subtreeNodes { - parents, ok := currentTxMap.Get(node.Hash) - if !ok { - return errors.NewProcessingError("[processRemainderTxHashes] error getting node txInpoints from currentTxMap for %s", node.Hash.String()) - } + // Sequential path: populate currentTxMap and collect inserted nodes + insertedNodes = make([]subtreepkg.Node, 0, totalNodes) + for _, node := range flatNodes { + parents, ok := currentTxMap.Get(node.Hash) + if !ok { + return errors.NewProcessingError("[processRemainderTxHashes] error getting node txInpoints from currentTxMap for %s", node.Hash.String()) + } - _ = stp.addNode(node, parents, skipNotification) + if _, wasSet := stp.currentTxMap.SetIfNotExists(node.Hash, parents); wasSet { + insertedNodes = append(insertedNodes, node) } } } + // Phase 3: Bulk build subtrees from inserted nodes + subtreeSize := int(stp.currentItemsPerFile.Load()) + if err := stp.bulkBuildSubtrees(insertedNodes, subtreeSize); err != nil { + return errors.NewProcessingError("[processRemainderTxHashes] bulk build error", err) + } + + // Phase 4: Finalize newly built subtrees (persistence + notifications) + stp.finalizeBulkBuildSubtrees(previousChainedCount, skipNotification) + return nil } diff --git a/services/blockassembly/subtreeprocessor/SubtreeProcessor_test.go b/services/blockassembly/subtreeprocessor/SubtreeProcessor_test.go index 3e73b1fb2d..94c73332a7 100644 --- a/services/blockassembly/subtreeprocessor/SubtreeProcessor_test.go +++ b/services/blockassembly/subtreeprocessor/SubtreeProcessor_test.go @@ -3410,7 +3410,7 @@ func TestRemoveCoinbaseUtxosChildrenRemoval(t *testing.T) { // the removal of child transactions when processing coinbase UTXOs through // the removeCoinbaseUtxos function integration. func TestMoveBackBlockChildrenRemoval(t *testing.T) { - t.Run("moveBackBlockCreateNewSubtrees_integration_with_child_removal", func(t *testing.T) { + t.Run("moveBackBlock_integration_with_child_removal", func(t *testing.T) { ctx := context.Background() // Setup test environment @@ -3434,7 +3434,10 @@ func TestMoveBackBlockChildrenRemoval(t *testing.T) { }() defer close(newSubtreeChan) - stp, err := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, settings, blobStore, nil, utxoStore, newSubtreeChan) + blockchainClient := &blockchain.Mock{} + blockchainClient.On("SetBlockProcessedAt", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + stp, err := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, settings, blobStore, blockchainClient, utxoStore, newSubtreeChan) require.NoError(t, err) // Use existing coinbase transaction from test data @@ -3444,7 +3447,7 @@ func TestMoveBackBlockChildrenRemoval(t *testing.T) { _, err = utxoStore.Create(ctx, coinbase, 1) require.NoError(t, err) - // Create block with empty subtrees (so moveBackBlockCreateNewSubtrees only calls removeCoinbaseUtxos) + // Create block with empty subtrees (so moveBackBlock only calls removeCoinbaseUtxos) block := &model.Block{ CoinbaseTx: coinbase, Header: &model.BlockHeader{ @@ -3458,9 +3461,9 @@ func TestMoveBackBlockChildrenRemoval(t *testing.T) { Subtrees: []*chainhash.Hash{}, // Empty to focus on removeCoinbaseUtxos call } - // Call moveBackBlockCreateNewSubtrees directly - _, _, err = stp.moveBackBlockCreateNewSubtrees(ctx, block, true) - require.NoError(t, err, "moveBackBlockCreateNewSubtrees should succeed") + // Call moveBackBlock directly + _, _, err = stp.moveBackBlock(ctx, block, true) + require.NoError(t, err, "moveBackBlock should succeed") }) } @@ -3800,17 +3803,26 @@ func TestRemoveCoinbaseUtxos_MissingTransaction(t *testing.T) { }) } -func TestMoveBackBlockCreateNewSubtrees_ErrorRecovery(t *testing.T) { - t.Run("moveBackBlockCreateNewSubtrees handles partial processing failures", func(t *testing.T) { +func TestMoveBackBlock_ErrorRecovery(t *testing.T) { + t.Run("moveBackBlock handles partial processing failures", func(t *testing.T) { newSubtreeChan := make(chan NewSubtreeRequest) settings := test.CreateBaseTestSettings(t) // Create a memory blob store for testing blobStore := blob_memory.New() - stp, _ := NewSubtreeProcessor(t.Context(), ulogger.TestLogger{}, settings, blobStore, nil, nil, newSubtreeChan) + blockchainClient := &blockchain.Mock{} + blockchainClient.On("SetBlockProcessedAt", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + ctx := t.Context() + utxoStoreURL, err := url.Parse("sqlitememory:///test") + require.NoError(t, err) + utxoStore, err := sql.New(ctx, ulogger.TestLogger{}, settings, utxoStoreURL) + require.NoError(t, err) + + stp, _ := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, settings, blobStore, blockchainClient, utxoStore, newSubtreeChan) stp.InitCurrentBlockHeader(prevBlockHeader) - stp.Start(t.Context()) + stp.Start(ctx) // Create a block with subtrees but corrupted data subtreeHash := chainhash.HashH([]byte("subtree1")) @@ -3829,19 +3841,19 @@ func TestMoveBackBlockCreateNewSubtrees_ErrorRecovery(t *testing.T) { // Store corrupted subtree data in blob store corruptedData := []byte("corrupted_subtree_data") - err := stp.subtreeStore.Set(context.Background(), subtreeHash[:], fileformat.FileTypeSubtree, corruptedData) + err = stp.subtreeStore.Set(context.Background(), subtreeHash[:], fileformat.FileTypeSubtree, corruptedData) require.NoError(t, err) // Capture state before operation originalState := captureSubtreeProcessorState(stp) - // Call moveBackBlockCreateNewSubtrees - _, _, err = stp.moveBackBlockCreateNewSubtrees(context.Background(), block, true) + // Call moveBackBlock + _, _, err = stp.moveBackBlock(context.Background(), block, true) // Should handle corrupted data gracefully or return appropriate error if err != nil { // If error occurred, verify state remains unchanged - assertStateUnchanged(t, stp, originalState, "moveBackBlockCreateNewSubtrees with corrupted data") + assertStateUnchanged(t, stp, originalState, "moveBackBlock with corrupted data") } }) } diff --git a/services/blockassembly/subtreeprocessor/hash_slice_pool.go b/services/blockassembly/subtreeprocessor/hash_slice_pool.go deleted file mode 100644 index 1973658fef..0000000000 --- a/services/blockassembly/subtreeprocessor/hash_slice_pool.go +++ /dev/null @@ -1,37 +0,0 @@ -package subtreeprocessor - -import ( - "sync" - - "github.com/bsv-blockchain/go-bt/v2/chainhash" -) - -// hashSlicePool pools []chainhash.Hash slices to reduce GC pressure during reorgs. -// Reorgs allocate large hash slices (potentially millions of 32-byte entries) that -// become garbage immediately after use. Pooling these avoids repeated multi-MB allocations. -var hashSlicePool = sync.Pool{} - -// getHashSlice returns a *[]chainhash.Hash from the pool or allocates a new one. -// The returned slice has length 0 and at least the requested capacity. -func getHashSlice(capacity int) *[]chainhash.Hash { - if v := hashSlicePool.Get(); v != nil { - s := v.(*[]chainhash.Hash) - if cap(*s) >= capacity { - *s = (*s)[:0] - return s - } - } - s := make([]chainhash.Hash, 0, capacity) - return &s -} - -// putHashSlice returns a slice to the pool for reuse. -func putHashSlice(s *[]chainhash.Hash) { - if s == nil { - return - } - // Clear references to allow GC of underlying hash data - clear(*s) - *s = (*s)[:0] - hashSlicePool.Put(s) -} diff --git a/services/blockassembly/subtreeprocessor/map.go b/services/blockassembly/subtreeprocessor/map.go index 2d08fdc508..db125c8fce 100644 --- a/services/blockassembly/subtreeprocessor/map.go +++ b/services/blockassembly/subtreeprocessor/map.go @@ -89,55 +89,133 @@ func (s *SplitSwissMap) Iter(f func(hash chainhash.Hash, v struct{}) bool) { } } +type txInpointsBucket struct { + mu sync.Mutex + m *swiss.Map[chainhash.Hash, *subtreepkg.TxInpoints] +} + type SplitTxInpointsMap struct { - m map[uint16]*txmap.SyncedMap[chainhash.Hash, *subtreepkg.TxInpoints] + buckets []txInpointsBucket nrOfBuckets uint16 } func NewSplitTxInpointsMap(nrOfBuckets uint16) *SplitTxInpointsMap { - m := make(map[uint16]*txmap.SyncedMap[chainhash.Hash, *subtreepkg.TxInpoints], nrOfBuckets) + buckets := make([]txInpointsBucket, nrOfBuckets) for i := uint16(0); i < nrOfBuckets; i++ { - m[i] = txmap.NewSyncedMap[chainhash.Hash, *subtreepkg.TxInpoints]() + buckets[i].m = swiss.NewMap[chainhash.Hash, *subtreepkg.TxInpoints](64) } return &SplitTxInpointsMap{ - m: m, + buckets: buckets, nrOfBuckets: nrOfBuckets, } } func (s *SplitTxInpointsMap) Delete(hash chainhash.Hash) bool { - return s.m[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)].Delete(hash) + b := &s.buckets[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)] + b.mu.Lock() + ok := b.m.Has(hash) + if ok { + b.m.Delete(hash) + } + b.mu.Unlock() + return ok } func (s *SplitTxInpointsMap) Exists(hash chainhash.Hash) bool { - return s.m[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)].Exists(hash) + b := &s.buckets[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)] + b.mu.Lock() + ok := b.m.Has(hash) + b.mu.Unlock() + return ok } func (s *SplitTxInpointsMap) Get(hash chainhash.Hash) (*subtreepkg.TxInpoints, bool) { - return s.m[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)].Get(hash) + b := &s.buckets[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)] + b.mu.Lock() + v, ok := b.m.Get(hash) + b.mu.Unlock() + return v, ok } func (s *SplitTxInpointsMap) Length() int { length := 0 - - for _, syncedMap := range s.m { - length += syncedMap.Length() + for i := uint16(0); i < s.nrOfBuckets; i++ { + b := &s.buckets[i] + b.mu.Lock() + length += b.m.Count() + b.mu.Unlock() } - return length } func (s *SplitTxInpointsMap) Set(hash chainhash.Hash, inpoints *subtreepkg.TxInpoints) { - s.m[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)].Set(hash, inpoints) + b := &s.buckets[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)] + b.mu.Lock() + b.m.Put(hash, inpoints) + b.mu.Unlock() } func (s *SplitTxInpointsMap) SetIfNotExists(hash chainhash.Hash, inpoints *subtreepkg.TxInpoints) (*subtreepkg.TxInpoints, bool) { - return s.m[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)].SetIfNotExists(hash, inpoints) + b := &s.buckets[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)] + b.mu.Lock() + if existing, ok := b.m.Get(hash); ok { + b.mu.Unlock() + return existing, false + } + b.m.Put(hash, inpoints) + b.mu.Unlock() + return inpoints, true } func (s *SplitTxInpointsMap) Clear() { - for _, syncedMap := range s.m { - syncedMap.Clear() + for i := uint16(0); i < s.nrOfBuckets; i++ { + b := &s.buckets[i] + b.mu.Lock() + b.m = swiss.NewMap[chainhash.Hash, *subtreepkg.TxInpoints](64) + b.mu.Unlock() + } +} + +// ParallelBulkSetIfNotExists inserts multiple entries in parallel, grouped by bucket. +// Each bucket is processed by a separate goroutine with a single lock acquisition. +// wasSet[i] is set to true if hashes[i] was newly inserted (not already present). +func (s *SplitTxInpointsMap) ParallelBulkSetIfNotExists( + hashes []chainhash.Hash, + inpoints []*subtreepkg.TxInpoints, + wasSet []bool, +) { + n := len(hashes) + if n == 0 { + return + } + + // Phase 1: Group indices by bucket (O(N), no locks) + bucketIndices := make([][]int, s.nrOfBuckets) + for i := 0; i < n; i++ { + bucket := txmap.Bytes2Uint16Buckets(hashes[i], s.nrOfBuckets) + bucketIndices[bucket] = append(bucketIndices[bucket], i) + } + + // Phase 2: Process each non-empty bucket in parallel (one lock per bucket) + var wg sync.WaitGroup + for bIdx := uint16(0); bIdx < s.nrOfBuckets; bIdx++ { + indices := bucketIndices[bIdx] + if len(indices) == 0 { + continue + } + wg.Add(1) + go func(b *txInpointsBucket, indices []int) { + defer wg.Done() + b.mu.Lock() + for _, idx := range indices { + if !b.m.Has(hashes[idx]) { + b.m.Put(hashes[idx], inpoints[idx]) + wasSet[idx] = true + } + } + b.mu.Unlock() + }(&s.buckets[bIdx], indices) } + wg.Wait() } diff --git a/services/blockassembly/subtreeprocessor/map_test.go b/services/blockassembly/subtreeprocessor/map_test.go new file mode 100644 index 0000000000..9873c87b52 --- /dev/null +++ b/services/blockassembly/subtreeprocessor/map_test.go @@ -0,0 +1,268 @@ +package subtreeprocessor + +import ( + "sync" + "testing" + + "github.com/bsv-blockchain/go-bt/v2/chainhash" + subtreepkg "github.com/bsv-blockchain/go-subtree" + "github.com/stretchr/testify/require" +) + +func makeHash(b byte) chainhash.Hash { + var h chainhash.Hash + h[0] = b + return h +} + +func makeInpoints(id byte) *subtreepkg.TxInpoints { + var h chainhash.Hash + h[0] = id + return &subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{h}} +} + +func TestSplitTxInpointsMap_SetIfNotExists(t *testing.T) { + m := NewSplitTxInpointsMap(16) + + h := makeHash(1) + inp := makeInpoints(1) + + // First insert should succeed + result, wasSet := m.SetIfNotExists(h, inp) + require.True(t, wasSet) + require.Equal(t, inp, result) + + // Second insert should return existing + inp2 := makeInpoints(2) + result, wasSet = m.SetIfNotExists(h, inp2) + require.False(t, wasSet) + require.Equal(t, inp, result) // original value returned +} + +func TestSplitTxInpointsMap_GetSetDelete(t *testing.T) { + m := NewSplitTxInpointsMap(16) + + h := makeHash(42) + inp := makeInpoints(42) + + // Get on empty map + _, ok := m.Get(h) + require.False(t, ok) + require.False(t, m.Exists(h)) + + // Set and Get + m.Set(h, inp) + got, ok := m.Get(h) + require.True(t, ok) + require.Equal(t, inp, got) + require.True(t, m.Exists(h)) + require.Equal(t, 1, m.Length()) + + // Delete + deleted := m.Delete(h) + require.True(t, deleted) + require.False(t, m.Exists(h)) + require.Equal(t, 0, m.Length()) + + // Delete non-existent + deleted = m.Delete(h) + require.False(t, deleted) +} + +func TestSplitTxInpointsMap_Clear(t *testing.T) { + m := NewSplitTxInpointsMap(16) + + for i := byte(0); i < 100; i++ { + m.Set(makeHash(i), makeInpoints(i)) + } + require.Equal(t, 100, m.Length()) + + m.Clear() + require.Equal(t, 0, m.Length()) + + // Verify all entries gone + for i := byte(0); i < 100; i++ { + require.False(t, m.Exists(makeHash(i))) + } +} + +func TestSplitTxInpointsMap_ConcurrentAccess(t *testing.T) { + m := NewSplitTxInpointsMap(256) + const n = 10000 + + // Concurrent writes + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + var h chainhash.Hash + h[0] = byte(idx) + h[1] = byte(idx >> 8) + m.Set(h, makeInpoints(byte(idx))) + }(i) + } + wg.Wait() + + // Concurrent reads + for i := 0; i < n; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + var h chainhash.Hash + h[0] = byte(idx) + h[1] = byte(idx >> 8) + _, _ = m.Get(h) + _ = m.Exists(h) + }(i) + } + wg.Wait() +} + +func TestSplitTxInpointsMap_BucketDistribution(t *testing.T) { + m := NewSplitTxInpointsMap(16) + + // Insert 1000 entries with varied hashes + for i := 0; i < 1000; i++ { + var h chainhash.Hash + h[0] = byte(i) + h[1] = byte(i >> 8) + h[2] = byte(i >> 16) + m.Set(h, makeInpoints(byte(i))) + } + + require.Equal(t, 1000, m.Length()) + + // Verify at least half the buckets got entries (basic distribution check) + bucketsUsed := 0 + for i := uint16(0); i < 16; i++ { + b := &m.buckets[i] + b.mu.Lock() + if b.m.Count() > 0 { + bucketsUsed++ + } + b.mu.Unlock() + } + require.Greater(t, bucketsUsed, 2, "hashes should distribute across multiple buckets") +} + +func TestParallelBulkSetIfNotExists_Correctness(t *testing.T) { + m := NewSplitTxInpointsMap(256) + const n = 10000 + + hashes := make([]chainhash.Hash, n) + inpoints := make([]*subtreepkg.TxInpoints, n) + for i := 0; i < n; i++ { + hashes[i][0] = byte(i) + hashes[i][1] = byte(i >> 8) + hashes[i][2] = byte(i >> 16) + inpoints[i] = makeInpoints(byte(i)) + } + + wasSet := make([]bool, n) + m.ParallelBulkSetIfNotExists(hashes, inpoints, wasSet) + + // All should have been set + for i := 0; i < n; i++ { + require.True(t, wasSet[i], "entry %d should have been set", i) + } + + // Verify all entries exist + require.Equal(t, n, m.Length()) + for i := 0; i < n; i++ { + got, ok := m.Get(hashes[i]) + require.True(t, ok) + require.Equal(t, inpoints[i], got) + } +} + +func TestParallelBulkSetIfNotExists_Duplicates(t *testing.T) { + m := NewSplitTxInpointsMap(256) + const n = 5000 + + // Pre-populate half the entries + hashes := make([]chainhash.Hash, n) + inpoints := make([]*subtreepkg.TxInpoints, n) + for i := 0; i < n; i++ { + hashes[i][0] = byte(i) + hashes[i][1] = byte(i >> 8) + inpoints[i] = makeInpoints(byte(i)) + } + + // Insert first half + for i := 0; i < n/2; i++ { + m.Set(hashes[i], inpoints[i]) + } + + // Bulk insert all — first half should be duplicates + newInpoints := make([]*subtreepkg.TxInpoints, n) + for i := 0; i < n; i++ { + newInpoints[i] = makeInpoints(byte(i + 100)) + } + + wasSet := make([]bool, n) + m.ParallelBulkSetIfNotExists(hashes, newInpoints, wasSet) + + // First half should NOT be set (duplicates) + for i := 0; i < n/2; i++ { + require.False(t, wasSet[i], "entry %d was pre-existing, should not be set", i) + // Original value should be preserved + got, _ := m.Get(hashes[i]) + require.Equal(t, inpoints[i], got) + } + + // Second half should be set + for i := n / 2; i < n; i++ { + require.True(t, wasSet[i], "entry %d should have been set", i) + } + + require.Equal(t, n, m.Length()) +} + +func TestParallelBulkSetIfNotExists_ConcurrentSafety(t *testing.T) { + m := NewSplitTxInpointsMap(256) + const n = 5000 + + hashes := make([]chainhash.Hash, n) + inpoints := make([]*subtreepkg.TxInpoints, n) + for i := 0; i < n; i++ { + hashes[i][0] = byte(i) + hashes[i][1] = byte(i >> 8) + inpoints[i] = makeInpoints(byte(i)) + } + + // Run two concurrent bulk inserts with overlapping keys + var wg sync.WaitGroup + wg.Add(2) + + wasSet1 := make([]bool, n) + wasSet2 := make([]bool, n) + + go func() { + defer wg.Done() + m.ParallelBulkSetIfNotExists(hashes, inpoints, wasSet1) + }() + go func() { + defer wg.Done() + m.ParallelBulkSetIfNotExists(hashes, inpoints, wasSet2) + }() + wg.Wait() + + // Each key should be set by exactly one caller + for i := 0; i < n; i++ { + // At least one must have set it + require.True(t, wasSet1[i] || wasSet2[i], "entry %d not set by either caller", i) + } + + require.Equal(t, n, m.Length()) +} + +func TestParallelBulkSetIfNotExists_Empty(t *testing.T) { + m := NewSplitTxInpointsMap(256) + + // Should not panic on empty input + m.ParallelBulkSetIfNotExists(nil, nil, nil) + m.ParallelBulkSetIfNotExists([]chainhash.Hash{}, []*subtreepkg.TxInpoints{}, []bool{}) + + require.Equal(t, 0, m.Length()) +} diff --git a/services/blockassembly/subtreeprocessor/reorg_alloc_benchmark_test.go b/services/blockassembly/subtreeprocessor/reorg_alloc_benchmark_test.go deleted file mode 100644 index 0e6582f695..0000000000 --- a/services/blockassembly/subtreeprocessor/reorg_alloc_benchmark_test.go +++ /dev/null @@ -1,355 +0,0 @@ -package subtreeprocessor - -import ( - "fmt" - "testing" - - "github.com/bsv-blockchain/go-bt/v2/chainhash" - subtreepkg "github.com/bsv-blockchain/go-subtree" -) - -// BenchmarkReorgOptimizations runs paired Old/New benchmarks for each optimization -// in the reorgBlocks path, showing the allocation and performance improvements. -// -// Run with: go test -bench BenchmarkReorgOptimizations -benchmem -count=3 -benchtime=50x ./services/blockassembly/subtreeprocessor/... -func BenchmarkReorgOptimizations(b *testing.B) { - scales := []struct { - name string - txCount int - }{ - {"10K", 10_000}, - {"100K", 100_000}, - } - - for _, scale := range scales { - b.Run(fmt.Sprintf("DedupFilterPipeline/Old/%s", scale.name), func(b *testing.B) { - benchmarkDedupFilterPipelineOld(b, scale.txCount) - }) - b.Run(fmt.Sprintf("DedupFilterPipeline/New/%s", scale.name), func(b *testing.B) { - benchmarkDedupFilterPipelineNew(b, scale.txCount) - }) - b.Run(fmt.Sprintf("AllMarkFalse/Old/%s", scale.name), func(b *testing.B) { - benchmarkAllMarkFalseOld(b, scale.txCount) - }) - b.Run(fmt.Sprintf("AllMarkFalse/New/%s", scale.name), func(b *testing.B) { - benchmarkAllMarkFalseNew(b, scale.txCount) - }) - b.Run(fmt.Sprintf("HashSlicePool/Old/%s", scale.name), func(b *testing.B) { - benchmarkHashSlicePoolOld(b, scale.txCount) - }) - b.Run(fmt.Sprintf("HashSlicePool/New/%s", scale.name), func(b *testing.B) { - benchmarkHashSlicePoolNew(b, scale.txCount) - }) - b.Run(fmt.Sprintf("NodeFlags/Old/%s", scale.name), func(b *testing.B) { - benchmarkNodeFlagsOld(b, scale.txCount) - }) - b.Run(fmt.Sprintf("NodeFlags/New/%s", scale.name), func(b *testing.B) { - benchmarkNodeFlagsNew(b, scale.txCount) - }) - } -} - -// --------------------------------------------------------------------------- -// 1. DedupFilterPipeline — the biggest win -// --------------------------------------------------------------------------- - -// Old pattern: Keys() intermediate slice, map[Hash]bool, separate filteredMarkOnLongestChain alloc -func benchmarkDedupFilterPipelineOld(b *testing.B, txCount int) { - winningHashes := generateHashes(txCount * 80 / 100) - losingHashes := generateHashesWithOffset(txCount*20/100, txCount*95/100) - movedBackHashes := generateHashesWithOffset(txCount*10/100, 0) - - movedBackSet := make(map[chainhash.Hash]struct{}, len(movedBackHashes)) - for _, h := range movedBackHashes { - movedBackSet[h] = struct{}{} - } - - // Simulate Keys() output — the old code called losingTxHashesMap.Keys() - // which returned a fresh []chainhash.Hash each iteration - losingKeys := make([]chainhash.Hash, len(losingHashes)) - copy(losingKeys, losingHashes) - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - winningTxSet := make(map[chainhash.Hash]struct{}, len(winningHashes)) - markOnLongestChain := make([]chainhash.Hash, 0, len(winningHashes)) - - for _, hash := range winningHashes { - winningTxSet[hash] = struct{}{} - if _, inMovedBack := movedBackSet[hash]; !inMovedBack { - markOnLongestChain = append(markOnLongestChain, hash) - } - } - - // OLD: Keys() allocates a fresh intermediate slice - rawLosingTxHashes := make([]chainhash.Hash, len(losingKeys)) - copy(rawLosingTxHashes, losingKeys) - - // OLD: map[chainhash.Hash]bool (1 byte per entry vs 0 for struct{}) - losingTxSet := make(map[chainhash.Hash]bool, len(rawLosingTxHashes)) - allLosingTxHashes := make([]chainhash.Hash, 0, len(rawLosingTxHashes)) - for _, hash := range rawLosingTxHashes { - if _, isWinning := winningTxSet[hash]; !isWinning { - if !losingTxSet[hash] { - losingTxSet[hash] = true - allLosingTxHashes = append(allLosingTxHashes, hash) - } - } - } - - // OLD: separate filteredMarkOnLongestChain allocation + copy pass - filteredMarkOnLongestChain := make([]chainhash.Hash, 0, len(markOnLongestChain)) - for _, hash := range markOnLongestChain { - if !losingTxSet[hash] { - filteredMarkOnLongestChain = append(filteredMarkOnLongestChain, hash) - } - } - - _ = allLosingTxHashes - _ = filteredMarkOnLongestChain - } -} - -// New pattern: Iter() avoids Keys(), map[Hash]struct{}, pooled slice, in-place filter -func benchmarkDedupFilterPipelineNew(b *testing.B, txCount int) { - winningHashes := generateHashes(txCount * 80 / 100) - losingHashes := generateHashesWithOffset(txCount*20/100, txCount*95/100) - movedBackHashes := generateHashesWithOffset(txCount*10/100, 0) - - movedBackSet := make(map[chainhash.Hash]struct{}, len(movedBackHashes)) - for _, h := range movedBackHashes { - movedBackSet[h] = struct{}{} - } - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - winningTxSet := make(map[chainhash.Hash]struct{}, len(winningHashes)) - losingTxSet := make(map[chainhash.Hash]struct{}, len(losingHashes)) - markOnLongestChain := make([]chainhash.Hash, 0, len(winningHashes)) - - for _, hash := range winningHashes { - winningTxSet[hash] = struct{}{} - if _, inMovedBack := movedBackSet[hash]; !inMovedBack { - markOnLongestChain = append(markOnLongestChain, hash) - } - } - - // NEW: iterate directly, no Keys() intermediate - for _, hash := range losingHashes { - if _, isWinning := winningTxSet[hash]; !isWinning { - losingTxSet[hash] = struct{}{} - } - } - - // NEW: pooled slice from losingTxSet - allLosing := getHashSlice(len(losingTxSet)) - for hash := range losingTxSet { - *allLosing = append(*allLosing, hash) - } - - // NEW: in-place filter — no second slice allocation - n := 0 - for _, hash := range markOnLongestChain { - if _, isLosing := losingTxSet[hash]; !isLosing { - markOnLongestChain[n] = hash - n++ - } - } - _ = markOnLongestChain[:n] - - putHashSlice(allLosing) - } -} - -// --------------------------------------------------------------------------- -// 2. AllMarkFalse construction — N+1 allocs/calls → 1 pooled alloc/call -// --------------------------------------------------------------------------- - -func buildSubtrees(b *testing.B, txCount int) []*subtreepkg.Subtree { - b.Helper() - // Use 1023 tx nodes per subtree + 1 coinbase = 1024 leaves (power of two required) - nodesPerSubtree := 1023 - numSubtrees := txCount / nodesPerSubtree - if numSubtrees < 1 { - numSubtrees = 1 - nodesPerSubtree = txCount - } - - subtrees := make([]*subtreepkg.Subtree, numSubtrees) - for s := 0; s < numSubtrees; s++ { - // leafCount must be power of two; nodesPerSubtree+1 = 1024 - leafCount := nextPowerOfTwo(nodesPerSubtree + 1) - st, err := subtreepkg.NewTreeByLeafCount(leafCount) - if err != nil { - b.Fatal(err) - } - _ = st.AddCoinbaseNode() - for i := 0; i < nodesPerSubtree; i++ { - hash := chainhash.HashH([]byte(fmt.Sprintf("bench-%d-%d", s, i))) - _ = st.AddSubtreeNode(subtreepkg.Node{Hash: hash, Fee: 100, SizeInBytes: 250}) - } - subtrees[s] = st - } - return subtrees -} - -func nextPowerOfTwo(n int) int { - n-- - n |= n >> 1 - n |= n >> 2 - n |= n >> 4 - n |= n >> 8 - n |= n >> 16 - n++ - return n -} - -// Old pattern: per-subtree make() + per-subtree store call -func benchmarkAllMarkFalseOld(b *testing.B, txCount int) { - subtrees := buildSubtrees(b, txCount) - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - // OLD: allocate per subtree, simulate per-subtree call - for _, st := range subtrees { - notOnLongestChain := make([]chainhash.Hash, 0, len(st.Nodes)) - for _, node := range st.Nodes { - if !node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { - notOnLongestChain = append(notOnLongestChain, node.Hash) - } - } - _ = len(notOnLongestChain) // simulate passing to store call - } - } -} - -// New pattern: single pooled alloc across all subtrees -func benchmarkAllMarkFalseNew(b *testing.B, txCount int) { - subtrees := buildSubtrees(b, txCount) - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - totalLen := 0 - for _, st := range subtrees { - totalLen += len(st.Nodes) - } - - allMarkFalse := getHashSlice(totalLen) - for _, st := range subtrees { - for _, node := range st.Nodes { - if !node.Hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { - *allMarkFalse = append(*allMarkFalse, node.Hash) - } - } - } - - _ = len(*allMarkFalse) - putHashSlice(allMarkFalse) - } -} - -// --------------------------------------------------------------------------- -// 3. HashSlicePool — fresh make() vs pooled get/put -// --------------------------------------------------------------------------- - -// Old pattern: fresh make() every time -func benchmarkHashSlicePoolOld(b *testing.B, txCount int) { - hashes := generateHashes(txCount) - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - s := make([]chainhash.Hash, 0, txCount) - s = append(s, hashes...) - _ = len(s) - } -} - -// New pattern: sync.Pool — 0 allocs after warmup -func benchmarkHashSlicePoolNew(b *testing.B, txCount int) { - hashes := generateHashes(txCount) - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - s := getHashSlice(txCount) - *s = append(*s, hashes...) - putHashSlice(s) - } -} - -// --------------------------------------------------------------------------- -// 4. NodeFlags — 3x bool arrays vs single byte bitset -// --------------------------------------------------------------------------- - -// Old pattern: three separate bool arrays (3 allocs, 3N bytes) -func benchmarkNodeFlagsOld(b *testing.B, txCount int) { - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - added := make([]bool, txCount) - removed := make([]bool, txCount) - modified := make([]bool, txCount) - // Simulate flag access - added[0] = true - removed[0] = true - modified[0] = true - _ = added[txCount-1] - _ = removed[txCount-1] - _ = modified[txCount-1] - } -} - -// New pattern: single byte slice with bit flags (1 alloc, N bytes) -func benchmarkNodeFlagsNew(b *testing.B, txCount int) { - const ( - flagAdded byte = 1 << 0 - flagRemoved byte = 1 << 1 - flagModified byte = 1 << 2 - ) - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - flags := make([]byte, txCount) - // Simulate flag access - flags[0] |= flagAdded - flags[0] |= flagRemoved - flags[0] |= flagModified - _ = flags[txCount-1] & flagAdded - _ = flags[txCount-1] & flagRemoved - _ = flags[txCount-1] & flagModified - } -} - -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - -func generateHashes(count int) []chainhash.Hash { - hashes := make([]chainhash.Hash, count) - for i := range hashes { - hashes[i] = chainhash.HashH([]byte(fmt.Sprintf("hash-%d", i))) - } - return hashes -} - -func generateHashesWithOffset(count, offset int) []chainhash.Hash { - hashes := make([]chainhash.Hash, count) - for i := range hashes { - hashes[i] = chainhash.HashH([]byte(fmt.Sprintf("hash-%d", i+offset))) - } - return hashes -} diff --git a/services/blockassembly/subtreeprocessor/reorg_benchmark_test.go b/services/blockassembly/subtreeprocessor/reorg_benchmark_test.go new file mode 100644 index 0000000000..c02ec63771 --- /dev/null +++ b/services/blockassembly/subtreeprocessor/reorg_benchmark_test.go @@ -0,0 +1,1051 @@ +package subtreeprocessor + +import ( + "context" + "fmt" + "net/url" + "runtime" + "testing" + "time" + + "github.com/bsv-blockchain/go-bt/v2" + "github.com/bsv-blockchain/go-bt/v2/chainhash" + subtreepkg "github.com/bsv-blockchain/go-subtree" + "github.com/bsv-blockchain/teranode/model" + "github.com/bsv-blockchain/teranode/pkg/fileformat" + "github.com/bsv-blockchain/teranode/services/blockchain" + blob_memory "github.com/bsv-blockchain/teranode/stores/blob/memory" + "github.com/bsv-blockchain/teranode/stores/blob/options" + "github.com/bsv-blockchain/teranode/stores/utxo/sql" + "github.com/bsv-blockchain/teranode/ulogger" + "github.com/bsv-blockchain/teranode/util/test" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// reorgBenchState holds all the state needed for a reorg benchmark run. +// Reusable across iterations to avoid setup cost in b.N loops. +type reorgBenchState struct { + stp *SubtreeProcessor + subtreeStore *blob_memory.Memory + utxoStore *sql.Store + blockchainClient *blockchain.Mock + newSubtreeChan chan NewSubtreeRequest + + // Block being moved back (the "losing" block) + moveBackBlock *model.Block + + // Block being moved forward (the "winning" block) + moveForwardBlock *model.Block + + // Transactions in the mempool (not in either block) + mempoolNodes []subtreepkg.Node + mempoolParents []*subtreepkg.TxInpoints + subtreeSize int + mempoolTxCount int + blockTxCount int + overlapTxCount int + overlapFraction float64 +} + +// setupReorgBenchmark creates a fully initialized benchmark scenario: +// +// - A "losing" block with blockTxCount transactions across subtrees, stored in blob store +// - A "winning" block with blockTxCount transactions (overlapFraction shared with losing block) +// - A mempool with mempoolTxCount transactions already in the subtree processor +// +// The benchmark measures the full reorg path: moveBackBlock + moveForwardBlock. +func setupReorgBenchmark(b *testing.B, blockTxCount, mempoolTxCount int, subtreeSize int, overlapFraction float64) *reorgBenchState { + b.Helper() + + ctx := context.Background() + + // Create stores + subtreeStore := blob_memory.New() + + tSettings := test.CreateBaseTestSettings(b) + tSettings.BlockAssembly.InitialMerkleItemsPerSubtree = subtreeSize + tSettings.BlockAssembly.StoreTxInpointsForSubtreeMeta = true + + utxoStoreURL, err := url.Parse("sqlitememory:///test") + require.NoError(b, err) + + utxoStore, err := sql.New(ctx, ulogger.TestLogger{}, tSettings, utxoStoreURL) + require.NoError(b, err) + + newSubtreeChan := make(chan NewSubtreeRequest, 1024) + + // Drain subtree channel in background + go func() { + for req := range newSubtreeChan { + if req.ErrChan != nil { + req.ErrChan <- nil + } + } + }() + + mockBlockchainClient := &blockchain.Mock{} + mockBlockchainClient.On("GetBlocksMinedNotSet", mock.Anything).Return([]*model.Block{}, nil) + mockBlockchainClient.On("SetBlockProcessedAt", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockBlockchainClient.On("GetBlockIsMined", mock.Anything, mock.Anything).Return(true, nil) + mockBlockchainClient.On("SetBlockProcessedAt", mock.Anything, mock.AnythingOfType("*chainhash.Hash"), mock.AnythingOfType("[]bool")).Return(nil) + + stp, err := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, tSettings, subtreeStore, mockBlockchainClient, utxoStore, newSubtreeChan) + require.NoError(b, err) + + // Generate all transaction hashes upfront + overlapCount := int(float64(blockTxCount) * overlapFraction) + + // Shared transactions (in both the losing and winning blocks) + sharedTxHashes := make([]chainhash.Hash, overlapCount) + for i := range sharedTxHashes { + sharedTxHashes[i] = chainhash.HashH([]byte(fmt.Sprintf("shared-tx-%d", i))) + } + + // Losing block only transactions + losingOnlyCount := blockTxCount - overlapCount + losingOnlyTxHashes := make([]chainhash.Hash, losingOnlyCount) + for i := range losingOnlyTxHashes { + losingOnlyTxHashes[i] = chainhash.HashH([]byte(fmt.Sprintf("losing-tx-%d", i))) + } + + // Winning block only transactions + winningOnlyCount := blockTxCount - overlapCount + winningOnlyTxHashes := make([]chainhash.Hash, winningOnlyCount) + for i := range winningOnlyTxHashes { + winningOnlyTxHashes[i] = chainhash.HashH([]byte(fmt.Sprintf("winning-tx-%d", i))) + } + + // Mempool transactions (remainder after reorg) + mempoolTxHashes := make([]chainhash.Hash, mempoolTxCount) + for i := range mempoolTxHashes { + mempoolTxHashes[i] = chainhash.HashH([]byte(fmt.Sprintf("mempool-tx-%d", i))) + } + + // Build the losing block's subtrees and store them + losingBlockTxHashes := append(sharedTxHashes, losingOnlyTxHashes...) + losingSubtreeHashes := buildAndStoreSubtrees(b, subtreeStore, losingBlockTxHashes, subtreeSize) + + // Build the winning block's subtrees and store them + winningBlockTxHashes := append(sharedTxHashes, winningOnlyTxHashes...) + winningSubtreeHashes := buildAndStoreSubtrees(b, subtreeStore, winningBlockTxHashes, subtreeSize) + + // Build block headers with proper chain linkage + genesisHeader := &model.BlockHeader{ + Version: 1, + HashPrevBlock: &chainhash.Hash{}, + HashMerkleRoot: &chainhash.Hash{}, + Timestamp: 1234567890, + Bits: model.NBit{}, + Nonce: 0, + } + + // The losing block extends genesis + losingBlockHeader := &model.BlockHeader{ + Version: 1, + HashPrevBlock: genesisHeader.Hash(), + HashMerkleRoot: &chainhash.Hash{}, + Timestamp: 1234567891, + Bits: model.NBit{}, + Nonce: 1, + } + + // The winning block also extends genesis (fork) + winningBlockHeader := &model.BlockHeader{ + Version: 1, + HashPrevBlock: genesisHeader.Hash(), + HashMerkleRoot: &chainhash.Hash{}, + Timestamp: 1234567892, + Bits: model.NBit{}, + Nonce: 2, + } + + // Mock the GetBlockHeader call for the parent of moveBack block + mockBlockchainClient.On("GetBlockHeader", mock.Anything, genesisHeader.Hash()).Return(genesisHeader, &model.BlockHeaderMeta{}, nil) + + // Store coinbase UTXO + _, err = utxoStore.Create(ctx, coinbaseTx, 1) + require.NoError(b, err) + + losingBlock := &model.Block{ + Height: 1, + ID: 1, + CoinbaseTx: coinbaseTx, + Subtrees: losingSubtreeHashes, + Header: losingBlockHeader, + TransactionCount: uint64(blockTxCount), + } + + winningBlock := &model.Block{ + Height: 1, + ID: 2, + CoinbaseTx: coinbaseTx2, + Subtrees: winningSubtreeHashes, + Header: winningBlockHeader, + TransactionCount: uint64(blockTxCount), + } + + // Prepare mempool nodes and parents for populating the subtree processor + mempoolNodes := make([]subtreepkg.Node, mempoolTxCount) + mempoolParents := make([]*subtreepkg.TxInpoints, mempoolTxCount) + parentHash := chainhash.HashH([]byte("parent-tx")) + for i, hash := range mempoolTxHashes { + mempoolNodes[i] = subtreepkg.Node{Hash: hash, Fee: 100, SizeInBytes: 250} + mempoolParents[i] = &subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{parentHash}} + } + + return &reorgBenchState{ + stp: stp, + subtreeStore: subtreeStore, + utxoStore: utxoStore, + blockchainClient: mockBlockchainClient, + newSubtreeChan: newSubtreeChan, + moveBackBlock: losingBlock, + moveForwardBlock: winningBlock, + mempoolNodes: mempoolNodes, + mempoolParents: mempoolParents, + subtreeSize: subtreeSize, + mempoolTxCount: mempoolTxCount, + blockTxCount: blockTxCount, + overlapTxCount: overlapCount, + overlapFraction: overlapFraction, + } +} + +// buildAndStoreSubtrees creates subtrees from the given transaction hashes, serializes them, +// stores them in the blob store, and returns the subtree root hashes for the block. +func buildAndStoreSubtrees(b *testing.B, store *blob_memory.Memory, txHashes []chainhash.Hash, subtreeSize int) []*chainhash.Hash { + b.Helper() + ctx := context.Background() + + var subtreeRootHashes []*chainhash.Hash + idx := 0 + + for idx < len(txHashes) { + // Determine how many txs go in this subtree + remaining := len(txHashes) - idx + count := subtreeSize + if remaining < count { + count = remaining + } + + subtree, err := subtreepkg.NewTreeByLeafCount(subtreeSize) + require.NoError(b, err) + + isFirst := len(subtreeRootHashes) == 0 + if isFirst { + err = subtree.AddCoinbaseNode() + require.NoError(b, err) + } + + for i := 0; i < count; i++ { + if isFirst && i == 0 { + // Skip first slot for coinbase in first subtree + continue + } + err = subtree.AddSubtreeNode(subtreepkg.Node{ + Hash: txHashes[idx+i], + Fee: 100, + SizeInBytes: 250, + }) + require.NoError(b, err) + + if subtree.IsComplete() { + break + } + } + + // Serialize and store the subtree + subtreeBytes, err := subtree.Serialize() + require.NoError(b, err) + + rootHash := subtree.RootHash() + err = store.Set(ctx, rootHash[:], fileformat.FileTypeSubtree, subtreeBytes, options.WithAllowOverwrite(true)) + require.NoError(b, err) + + // Create and store subtree meta + subtreeMeta := subtreepkg.NewSubtreeMeta(subtree) + parentHash := chainhash.HashH([]byte("bench-parent")) + for j := range subtree.Nodes { + _ = subtreeMeta.SetTxInpoints(j, subtreepkg.TxInpoints{ + ParentTxHashes: []chainhash.Hash{parentHash}, + Idxs: [][]uint32{{0}}, + }) + } + metaBytes, err := subtreeMeta.Serialize() + require.NoError(b, err) + err = store.Set(ctx, rootHash[:], fileformat.FileTypeSubtreeMeta, metaBytes, options.WithAllowOverwrite(true)) + require.NoError(b, err) + + hashCopy := *rootHash + subtreeRootHashes = append(subtreeRootHashes, &hashCopy) + + // Move index forward by actual number of non-coinbase nodes added + if isFirst { + idx += subtree.Length() - 1 // subtract coinbase + } else { + idx += subtree.Length() + } + } + + return subtreeRootHashes +} + +// populateMempool fills the subtree processor with mempool transactions +// to simulate pre-reorg state. +func populateMempool(b *testing.B, state *reorgBenchState) { + b.Helper() + + // Set the current block header to the losing block's header + // so moveForwardBlock's header validation passes + state.stp.InitCurrentBlockHeader(state.moveBackBlock.Header) + + // Add mempool transactions via addNode + for i, node := range state.mempoolNodes { + err := state.stp.addNode(node, state.mempoolParents[i], true) + if err != nil { + b.Fatalf("failed to add mempool node %d: %v", i, err) + } + } +} + +// resetProcessorState resets the subtree processor to a clean state for the next benchmark iteration. +func resetProcessorState(b *testing.B, state *reorgBenchState) { + b.Helper() + + ctx := context.Background() + tSettings := test.CreateBaseTestSettings(b) + tSettings.BlockAssembly.InitialMerkleItemsPerSubtree = state.subtreeSize + tSettings.BlockAssembly.StoreTxInpointsForSubtreeMeta = true + + stp, err := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, tSettings, state.subtreeStore, state.blockchainClient, state.utxoStore, state.newSubtreeChan) + require.NoError(b, err) + + state.stp = stp +} + +// BenchmarkMoveBackBlock benchmarks the moveBackBlock operation in isolation. +// This measures: blob reads + deserialization + per-node addNode calls for block txs + mempool txs. +func BenchmarkMoveBackBlock(b *testing.B) { + benchmarks := []struct { + name string + blockTxCount int + mempoolTxCount int + subtreeSize int + }{ + {"1K_block_1K_mempool", 1_000, 1_000, 1024}, + {"10K_block_10K_mempool", 10_000, 10_000, 1024}, + {"10K_block_50K_mempool", 10_000, 50_000, 4096}, + {"50K_block_50K_mempool", 50_000, 50_000, 4096}, + {"100K_block_100K_mempool", 100_000, 100_000, 8192}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + state := setupReorgBenchmark(b, bm.blockTxCount, bm.mempoolTxCount, bm.subtreeSize, 0.5) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + resetProcessorState(b, state) + populateMempool(b, state) + runtime.GC() + b.StartTimer() + + _, _, err := state.stp.moveBackBlock(context.Background(), state.moveBackBlock, false) + if err != nil { + b.Fatalf("moveBackBlock failed: %v", err) + } + } + + b.ReportMetric(float64(bm.blockTxCount+bm.mempoolTxCount), "total_txs") + b.ReportMetric(float64(bm.blockTxCount), "block_txs") + b.ReportMetric(float64(bm.mempoolTxCount), "mempool_txs") + }) + } +} + +// BenchmarkMoveForwardBlock benchmarks the moveForwardBlock operation in isolation. +// This measures: subtree filtering + transaction map creation + processRemainderTxHashes. +func BenchmarkMoveForwardBlock(b *testing.B) { + benchmarks := []struct { + name string + blockTxCount int + mempoolTxCount int + subtreeSize int + }{ + {"1K_block_1K_mempool", 1_000, 1_000, 1024}, + {"10K_block_10K_mempool", 10_000, 10_000, 1024}, + {"10K_block_50K_mempool", 10_000, 50_000, 4096}, + {"50K_block_50K_mempool", 50_000, 50_000, 4096}, + {"100K_block_100K_mempool", 100_000, 100_000, 8192}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + state := setupReorgBenchmark(b, bm.blockTxCount, bm.mempoolTxCount, bm.subtreeSize, 0.5) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + resetProcessorState(b, state) + populateMempool(b, state) + + // First do a moveBackBlock to get into a post-moveBack state + _, _, err := state.stp.moveBackBlock(context.Background(), state.moveBackBlock, false) + if err != nil { + b.Fatalf("moveBackBlock failed: %v", err) + } + + // Set the header to genesis so moveForwardBlock can validate the winning block + state.stp.currentBlockHeader.Store(&model.BlockHeader{ + Version: 1, + HashPrevBlock: &chainhash.Hash{}, + HashMerkleRoot: &chainhash.Hash{}, + Timestamp: 1234567890, + Bits: model.NBit{}, + Nonce: 0, + }) + + runtime.GC() + b.StartTimer() + + _, _, err = state.stp.moveForwardBlock(context.Background(), state.moveForwardBlock, true, make(map[chainhash.Hash]bool), true, false) + if err != nil { + b.Fatalf("moveForwardBlock failed: %v", err) + } + } + + b.ReportMetric(float64(bm.blockTxCount+bm.mempoolTxCount), "total_txs") + }) + } +} + +// BenchmarkFullReorg benchmarks the complete reorg path: moveBackBlock + moveForwardBlock. +// This is the primary benchmark to track — it measures end-to-end reorg performance. +func BenchmarkFullReorg(b *testing.B) { + benchmarks := []struct { + name string + blockTxCount int + mempoolTxCount int + subtreeSize int + overlapFraction float64 + }{ + // Small scale — fast iteration + {"1K_block_1K_mempool_50pct_overlap", 1_000, 1_000, 1024, 0.5}, + + // Medium scale — representative of small blocks + {"10K_block_10K_mempool_50pct_overlap", 10_000, 10_000, 1024, 0.5}, + + // Larger mempool than block — common production scenario + {"10K_block_50K_mempool_50pct_overlap", 10_000, 50_000, 4096, 0.5}, + + // Larger scale — starts to show contention effects + {"50K_block_50K_mempool_50pct_overlap", 50_000, 50_000, 4096, 0.5}, + + // Large scale — the target for optimization + {"100K_block_100K_mempool_50pct_overlap", 100_000, 100_000, 8192, 0.5}, + + // Overlap variation: 0% overlap (worst case — no shared txs between blocks) + {"10K_block_10K_mempool_0pct_overlap", 10_000, 10_000, 1024, 0.0}, + + // Overlap variation: 90% overlap (best case — nearly identical blocks) + {"10K_block_10K_mempool_90pct_overlap", 10_000, 10_000, 1024, 0.9}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + state := setupReorgBenchmark(b, bm.blockTxCount, bm.mempoolTxCount, bm.subtreeSize, bm.overlapFraction) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + resetProcessorState(b, state) + populateMempool(b, state) + runtime.GC() + b.StartTimer() + + // Step 1: moveBackBlock + _, _, err := state.stp.moveBackBlock(context.Background(), state.moveBackBlock, false) + if err != nil { + b.Fatalf("moveBackBlock failed: %v", err) + } + + // Reset header to genesis (parent of both losing and winning blocks) + state.stp.currentBlockHeader.Store(&model.BlockHeader{ + Version: 1, + HashPrevBlock: &chainhash.Hash{}, + HashMerkleRoot: &chainhash.Hash{}, + Timestamp: 1234567890, + Bits: model.NBit{}, + Nonce: 0, + }) + + // Step 2: moveForwardBlock + _, _, err = state.stp.moveForwardBlock(context.Background(), state.moveForwardBlock, true, make(map[chainhash.Hash]bool), true, false) + if err != nil { + b.Fatalf("moveForwardBlock failed: %v", err) + } + } + + totalTxs := bm.blockTxCount + bm.mempoolTxCount + b.ReportMetric(float64(totalTxs), "total_txs") + b.ReportMetric(float64(bm.blockTxCount), "block_txs") + b.ReportMetric(float64(bm.mempoolTxCount), "mempool_txs") + b.ReportMetric(bm.overlapFraction*100, "overlap_pct") + }) + } +} + +// BenchmarkAddNodeSequential benchmarks the per-node addNode overhead in isolation. +// This isolates the mutex lock + append + IsComplete + map insert cost per transaction. +func BenchmarkAddNodeSequential(b *testing.B) { + benchmarks := []struct { + name string + nodeCount int + subtreeSize int + }{ + {"1K_nodes", 1_000, 1024}, + {"10K_nodes", 10_000, 4096}, + {"100K_nodes", 100_000, 8192}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + ctx := context.Background() + tSettings := test.CreateBaseTestSettings(b) + tSettings.BlockAssembly.InitialMerkleItemsPerSubtree = bm.subtreeSize + + newSubtreeChan := make(chan NewSubtreeRequest, 1024) + go func() { + for req := range newSubtreeChan { + if req.ErrChan != nil { + req.ErrChan <- nil + } + } + }() + defer close(newSubtreeChan) + + // Pre-generate nodes and parents + nodes := make([]subtreepkg.Node, bm.nodeCount) + parents := make([]*subtreepkg.TxInpoints, bm.nodeCount) + parentHash := chainhash.HashH([]byte("parent")) + for i := range nodes { + nodes[i] = subtreepkg.Node{ + Hash: chainhash.HashH([]byte(fmt.Sprintf("seq-node-%d", i))), + Fee: 100, + SizeInBytes: 250, + } + parents[i] = &subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{parentHash}} + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + stp, err := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, tSettings, nil, nil, nil, newSubtreeChan) + if err != nil { + b.Fatal(err) + } + runtime.GC() + b.StartTimer() + + for j, node := range nodes { + if err := stp.addNode(node, parents[j], true); err != nil { + b.Fatalf("addNode failed at %d: %v", j, err) + } + } + } + + b.ReportMetric(float64(bm.nodeCount), "nodes") + }) + } +} + +// BenchmarkProcessRemainderTxHashes benchmarks the remainder processing after moveForwardBlock. +// This isolates the filter + dedup + sequential addNode/addNodePreValidated cost. +func BenchmarkProcessRemainderTxHashes(b *testing.B) { + benchmarks := []struct { + name string + chainedSubtrees int + txsPerSubtree int + blockTxPct float64 // fraction of txs that are in the block (filtered out) + }{ + {"10_subtrees_1K_each_50pct_filtered", 10, 1024, 0.5}, + {"50_subtrees_1K_each_50pct_filtered", 50, 1024, 0.5}, + {"100_subtrees_1K_each_50pct_filtered", 100, 1024, 0.5}, + {"10_subtrees_4K_each_50pct_filtered", 10, 4096, 0.5}, + {"50_subtrees_4K_each_10pct_filtered", 50, 4096, 0.1}, + {"50_subtrees_4K_each_90pct_filtered", 50, 4096, 0.9}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + ctx := context.Background() + tSettings := test.CreateBaseTestSettings(b) + tSettings.BlockAssembly.InitialMerkleItemsPerSubtree = bm.txsPerSubtree * 2 // large enough to not rotate + + newSubtreeChan := make(chan NewSubtreeRequest, 1024) + go func() { + for req := range newSubtreeChan { + if req.ErrChan != nil { + req.ErrChan <- nil + } + } + }() + defer close(newSubtreeChan) + + totalTxs := bm.chainedSubtrees * bm.txsPerSubtree + blockTxCount := int(float64(totalTxs) * bm.blockTxPct) + + // Create chained subtrees + chainedSubtrees := make([]*subtreepkg.Subtree, bm.chainedSubtrees) + allTxHashes := make([]chainhash.Hash, 0, totalTxs) + parentHash := chainhash.HashH([]byte("parent")) + + for s := 0; s < bm.chainedSubtrees; s++ { + subtree, err := subtreepkg.NewTreeByLeafCount(bm.txsPerSubtree) + require.NoError(b, err) + + if s == 0 { + _ = subtree.AddCoinbaseNode() + } + + for i := 0; i < bm.txsPerSubtree-1; i++ { + txHash := chainhash.HashH([]byte(fmt.Sprintf("rem-tx-%d-%d", s, i))) + err = subtree.AddSubtreeNode(subtreepkg.Node{Hash: txHash, Fee: 100, SizeInBytes: 250}) + require.NoError(b, err) + allTxHashes = append(allTxHashes, txHash) + + if subtree.IsComplete() { + break + } + } + + chainedSubtrees[s] = subtree + } + + // Create block transaction map (subset of allTxHashes) + transactionMap := NewSplitSwissMap(1024, blockTxCount) + for i := 0; i < blockTxCount && i < len(allTxHashes); i++ { + _ = transactionMap.Put(allTxHashes[i]) + } + + // Create currentTxMap with all transactions + currentTxMap := NewSplitTxInpointsMap(splitMapBuckets) + for _, hash := range allTxHashes { + currentTxMap.Set(hash, &subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{parentHash}}) + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + b.StopTimer() + + stp, err := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, tSettings, nil, nil, nil, newSubtreeChan) + require.NoError(b, err) + + runtime.GC() + b.StartTimer() + + err = stp.processRemainderTxHashes(ctx, chainedSubtrees, transactionMap, nil, currentTxMap, true) + if err != nil { + b.Fatalf("processRemainderTxHashes failed: %v", err) + } + } + + b.ReportMetric(float64(totalTxs), "total_txs") + b.ReportMetric(float64(totalTxs-blockTxCount), "remainder_txs") + b.ReportMetric(bm.blockTxPct*100, "filtered_pct") + }) + } +} + +// BenchmarkReorgMemoryProfile runs a single reorg iteration and reports detailed memory stats. +// Not a proper benchmark (N=1), but useful for profiling memory usage. +func BenchmarkReorgMemoryProfile(b *testing.B) { + benchmarks := []struct { + name string + blockTxCount int + mempoolTxCount int + subtreeSize int + }{ + {"10K_block_10K_mempool", 10_000, 10_000, 1024}, + {"50K_block_50K_mempool", 50_000, 50_000, 4096}, + {"100K_block_100K_mempool", 100_000, 100_000, 8192}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + state := setupReorgBenchmark(b, bm.blockTxCount, bm.mempoolTxCount, bm.subtreeSize, 0.5) + + resetProcessorState(b, state) + populateMempool(b, state) + + runtime.GC() + var memBefore runtime.MemStats + runtime.ReadMemStats(&memBefore) + + start := time.Now() + + // moveBackBlock + _, _, err := state.stp.moveBackBlock(context.Background(), state.moveBackBlock, false) + require.NoError(b, err) + + var memAfterMoveBack runtime.MemStats + runtime.ReadMemStats(&memAfterMoveBack) + + moveBackDuration := time.Since(start) + + // Reset header to genesis + state.stp.currentBlockHeader.Store(&model.BlockHeader{ + Version: 1, + HashPrevBlock: &chainhash.Hash{}, + HashMerkleRoot: &chainhash.Hash{}, + Timestamp: 1234567890, + Bits: model.NBit{}, + Nonce: 0, + }) + + moveForwardStart := time.Now() + + // moveForwardBlock + _, _, err = state.stp.moveForwardBlock(context.Background(), state.moveForwardBlock, true, make(map[chainhash.Hash]bool), true, false) + require.NoError(b, err) + + moveForwardDuration := time.Since(moveForwardStart) + totalDuration := time.Since(start) + + var memAfter runtime.MemStats + runtime.ReadMemStats(&memAfter) + + b.ReportMetric(float64(moveBackDuration.Milliseconds()), "moveBack_ms") + b.ReportMetric(float64(moveForwardDuration.Milliseconds()), "moveForward_ms") + b.ReportMetric(float64(totalDuration.Milliseconds()), "total_ms") + b.ReportMetric(float64(memAfterMoveBack.TotalAlloc-memBefore.TotalAlloc)/(1024*1024), "moveBack_alloc_MB") + b.ReportMetric(float64(memAfter.TotalAlloc-memAfterMoveBack.TotalAlloc)/(1024*1024), "moveForward_alloc_MB") + b.ReportMetric(float64(memAfter.TotalAlloc-memBefore.TotalAlloc)/(1024*1024), "total_alloc_MB") + b.ReportMetric(float64(memAfter.Alloc)/(1024*1024), "heap_inuse_MB") + }) + } +} + +// TestReorgBenchmarkBaseline runs the full reorg at multiple scales and prints a summary table. +// This is a test (not benchmark) for easy "go test -run" invocation with human-readable output. +func TestReorgBenchmarkBaseline(t *testing.T) { + if testing.Short() { + t.Skip("skipping baseline benchmark in short mode") + } + + scales := []struct { + name string + blockTxCount int + mempoolTxCount int + subtreeSize int + }{ + {"1K", 1_000, 1_000, 1024}, + {"10K", 10_000, 10_000, 1024}, + {"50K", 50_000, 50_000, 4096}, + {"100K", 100_000, 100_000, 8192}, + } + + fmt.Println() + fmt.Println("=== Reorg Baseline Performance ===") + fmt.Printf("%-8s %12s %12s %12s %12s %12s\n", + "Scale", "MoveBack", "MoveForward", "Total", "Alloc(MB)", "Txs/sec") + fmt.Println("-------- ------------ ------------ ------------ ------------ ------------") + + for _, s := range scales { + state := setupReorgBenchmarkT(t, s.blockTxCount, s.mempoolTxCount, s.subtreeSize, 0.5) + resetProcessorStateT(t, state) + populateMempoolT(t, state) + + runtime.GC() + var memBefore runtime.MemStats + runtime.ReadMemStats(&memBefore) + + start := time.Now() + + _, _, err := state.stp.moveBackBlock(context.Background(), state.moveBackBlock, false) + require.NoError(t, err) + moveBackDuration := time.Since(start) + + state.stp.currentBlockHeader.Store(&model.BlockHeader{ + Version: 1, + HashPrevBlock: &chainhash.Hash{}, + HashMerkleRoot: &chainhash.Hash{}, + Timestamp: 1234567890, + Bits: model.NBit{}, + Nonce: 0, + }) + + moveForwardStart := time.Now() + _, _, err = state.stp.moveForwardBlock(context.Background(), state.moveForwardBlock, true, make(map[chainhash.Hash]bool), true, false) + require.NoError(t, err) + moveForwardDuration := time.Since(moveForwardStart) + + totalDuration := time.Since(start) + + var memAfter runtime.MemStats + runtime.ReadMemStats(&memAfter) + allocMB := float64(memAfter.TotalAlloc-memBefore.TotalAlloc) / (1024 * 1024) + + totalTxs := s.blockTxCount + s.mempoolTxCount + txPerSec := float64(totalTxs) / totalDuration.Seconds() + + fmt.Printf("%-8s %12s %12s %12s %12.1f %12.0f\n", + s.name, moveBackDuration.Round(time.Millisecond), moveForwardDuration.Round(time.Millisecond), + totalDuration.Round(time.Millisecond), allocMB, txPerSec) + } + + fmt.Println() +} + +// setupReorgBenchmark variant for *testing.T (used by TestReorgBenchmarkBaseline) +func setupReorgBenchmarkT(t *testing.T, blockTxCount, mempoolTxCount int, subtreeSize int, overlapFraction float64) *reorgBenchState { + t.Helper() + + ctx := context.Background() + subtreeStore := blob_memory.New() + + tSettings := test.CreateBaseTestSettings(t) + tSettings.BlockAssembly.InitialMerkleItemsPerSubtree = subtreeSize + tSettings.BlockAssembly.StoreTxInpointsForSubtreeMeta = true + + utxoStoreURL, err := url.Parse("sqlitememory:///test") + require.NoError(t, err) + + utxoStore, err := sql.New(ctx, ulogger.TestLogger{}, tSettings, utxoStoreURL) + require.NoError(t, err) + + newSubtreeChan := make(chan NewSubtreeRequest, 1024) + go func() { + for req := range newSubtreeChan { + if req.ErrChan != nil { + req.ErrChan <- nil + } + } + }() + t.Cleanup(func() { close(newSubtreeChan) }) + + mockBlockchainClient := &blockchain.Mock{} + mockBlockchainClient.On("GetBlocksMinedNotSet", mock.Anything).Return([]*model.Block{}, nil) + mockBlockchainClient.On("SetBlockProcessedAt", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockBlockchainClient.On("GetBlockIsMined", mock.Anything, mock.Anything).Return(true, nil) + + stp, err := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, tSettings, subtreeStore, mockBlockchainClient, utxoStore, newSubtreeChan) + require.NoError(t, err) + + overlapCount := int(float64(blockTxCount) * overlapFraction) + + sharedTxHashes := make([]chainhash.Hash, overlapCount) + for i := range sharedTxHashes { + sharedTxHashes[i] = chainhash.HashH([]byte(fmt.Sprintf("shared-tx-%d", i))) + } + + losingOnlyCount := blockTxCount - overlapCount + losingOnlyTxHashes := make([]chainhash.Hash, losingOnlyCount) + for i := range losingOnlyTxHashes { + losingOnlyTxHashes[i] = chainhash.HashH([]byte(fmt.Sprintf("losing-tx-%d", i))) + } + + winningOnlyCount := blockTxCount - overlapCount + winningOnlyTxHashes := make([]chainhash.Hash, winningOnlyCount) + for i := range winningOnlyTxHashes { + winningOnlyTxHashes[i] = chainhash.HashH([]byte(fmt.Sprintf("winning-tx-%d", i))) + } + + mempoolTxHashes := make([]chainhash.Hash, mempoolTxCount) + for i := range mempoolTxHashes { + mempoolTxHashes[i] = chainhash.HashH([]byte(fmt.Sprintf("mempool-tx-%d", i))) + } + + losingBlockTxHashes := append(sharedTxHashes, losingOnlyTxHashes...) + losingSubtreeHashes := buildAndStoreSubtreesT(t, subtreeStore, losingBlockTxHashes, subtreeSize) + + winningBlockTxHashes := append(sharedTxHashes, winningOnlyTxHashes...) + winningSubtreeHashes := buildAndStoreSubtreesT(t, subtreeStore, winningBlockTxHashes, subtreeSize) + + genesisHeader := &model.BlockHeader{ + Version: 1, + HashPrevBlock: &chainhash.Hash{}, + HashMerkleRoot: &chainhash.Hash{}, + Timestamp: 1234567890, + Bits: model.NBit{}, + Nonce: 0, + } + + losingBlockHeader := &model.BlockHeader{ + Version: 1, + HashPrevBlock: genesisHeader.Hash(), + HashMerkleRoot: &chainhash.Hash{}, + Timestamp: 1234567891, + Bits: model.NBit{}, + Nonce: 1, + } + + winningBlockHeader := &model.BlockHeader{ + Version: 1, + HashPrevBlock: genesisHeader.Hash(), + HashMerkleRoot: &chainhash.Hash{}, + Timestamp: 1234567892, + Bits: model.NBit{}, + Nonce: 2, + } + + mockBlockchainClient.On("GetBlockHeader", mock.Anything, genesisHeader.Hash()).Return(genesisHeader, &model.BlockHeaderMeta{}, nil) + + _, err = utxoStore.Create(ctx, coinbaseTx, 1) + require.NoError(t, err) + + losingBlock := &model.Block{ + Height: 1, + ID: 1, + CoinbaseTx: coinbaseTx, + Subtrees: losingSubtreeHashes, + Header: losingBlockHeader, + TransactionCount: uint64(blockTxCount), + } + + winningBlock := &model.Block{ + Height: 1, + ID: 2, + CoinbaseTx: coinbaseTx2, + Subtrees: winningSubtreeHashes, + Header: winningBlockHeader, + TransactionCount: uint64(blockTxCount), + } + + mempoolNodes := make([]subtreepkg.Node, mempoolTxCount) + mempoolParents := make([]*subtreepkg.TxInpoints, mempoolTxCount) + parentHash := chainhash.HashH([]byte("parent-tx")) + for i, hash := range mempoolTxHashes { + mempoolNodes[i] = subtreepkg.Node{Hash: hash, Fee: 100, SizeInBytes: 250} + mempoolParents[i] = &subtreepkg.TxInpoints{ParentTxHashes: []chainhash.Hash{parentHash}} + } + + return &reorgBenchState{ + stp: stp, + subtreeStore: subtreeStore, + utxoStore: utxoStore, + blockchainClient: mockBlockchainClient, + newSubtreeChan: newSubtreeChan, + moveBackBlock: losingBlock, + moveForwardBlock: winningBlock, + mempoolNodes: mempoolNodes, + mempoolParents: mempoolParents, + subtreeSize: subtreeSize, + mempoolTxCount: mempoolTxCount, + blockTxCount: blockTxCount, + overlapTxCount: overlapCount, + overlapFraction: overlapFraction, + } +} + +func buildAndStoreSubtreesT(t *testing.T, store *blob_memory.Memory, txHashes []chainhash.Hash, subtreeSize int) []*chainhash.Hash { + t.Helper() + ctx := context.Background() + + var subtreeRootHashes []*chainhash.Hash + idx := 0 + + for idx < len(txHashes) { + remaining := len(txHashes) - idx + count := subtreeSize + if remaining < count { + count = remaining + } + + subtree, err := subtreepkg.NewTreeByLeafCount(subtreeSize) + require.NoError(t, err) + + isFirst := len(subtreeRootHashes) == 0 + if isFirst { + err = subtree.AddCoinbaseNode() + require.NoError(t, err) + } + + for i := 0; i < count; i++ { + if isFirst && i == 0 { + continue + } + err = subtree.AddSubtreeNode(subtreepkg.Node{ + Hash: txHashes[idx+i], + Fee: 100, + SizeInBytes: 250, + }) + require.NoError(t, err) + + if subtree.IsComplete() { + break + } + } + + subtreeBytes, err := subtree.Serialize() + require.NoError(t, err) + + rootHash := subtree.RootHash() + err = store.Set(ctx, rootHash[:], fileformat.FileTypeSubtree, subtreeBytes, options.WithAllowOverwrite(true)) + require.NoError(t, err) + + subtreeMeta := subtreepkg.NewSubtreeMeta(subtree) + parentHash := chainhash.HashH([]byte("bench-parent")) + for j := range subtree.Nodes { + _ = subtreeMeta.SetTxInpoints(j, subtreepkg.TxInpoints{ + ParentTxHashes: []chainhash.Hash{parentHash}, + Idxs: [][]uint32{{0}}, + }) + } + metaBytes, err := subtreeMeta.Serialize() + require.NoError(t, err) + err = store.Set(ctx, rootHash[:], fileformat.FileTypeSubtreeMeta, metaBytes, options.WithAllowOverwrite(true)) + require.NoError(t, err) + + hashCopy := *rootHash + subtreeRootHashes = append(subtreeRootHashes, &hashCopy) + + if isFirst { + idx += subtree.Length() - 1 + } else { + idx += subtree.Length() + } + } + + return subtreeRootHashes +} + +func resetProcessorStateT(t *testing.T, state *reorgBenchState) { + t.Helper() + + ctx := context.Background() + tSettings := test.CreateBaseTestSettings(t) + tSettings.BlockAssembly.InitialMerkleItemsPerSubtree = state.subtreeSize + tSettings.BlockAssembly.StoreTxInpointsForSubtreeMeta = true + + stp, err := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, tSettings, state.subtreeStore, state.blockchainClient, state.utxoStore, state.newSubtreeChan) + require.NoError(t, err) + + state.stp = stp +} + +func populateMempoolT(t *testing.T, state *reorgBenchState) { + t.Helper() + + state.stp.InitCurrentBlockHeader(state.moveBackBlock.Header) + + for i, node := range state.mempoolNodes { + err := state.stp.addNode(node, state.mempoolParents[i], true) + require.NoError(t, err, "failed to add mempool node %d", i) + } +} + +// init ensures the unused import for bt is used +var _ = bt.NewTx() diff --git a/stores/utxo/aerospike/longest_chain_production_bench_test.go b/stores/utxo/aerospike/longest_chain_production_bench_test.go index 075095c6cf..f615205ae3 100644 --- a/stores/utxo/aerospike/longest_chain_production_bench_test.go +++ b/stores/utxo/aerospike/longest_chain_production_bench_test.go @@ -168,6 +168,107 @@ func createProductionLikeTransactions(b *testing.B, ctx context.Context, store u return txHashes } +// BenchmarkConcurrentVsSequentialMarking compares running mark(true) and mark(false) +// on disjoint hash sets sequentially vs concurrently. This mirrors the Phase 2 reorg +// optimization where MarkTransactionsOnLongestChain(true) for winning-chain txs and +// MarkTransactionsOnLongestChain(false) for losing-chain txs run in parallel via errgroup. +// +// Run: +// +// go test -run=^$ -bench=BenchmarkConcurrentVsSequentialMarking -benchmem -benchtime=1x -timeout=30m ./stores/utxo/aerospike/ +func BenchmarkConcurrentVsSequentialMarking(b *testing.B) { + if testing.Short() { + b.Skip("Skipping production-scale benchmark in short mode") + } + + store, ctx := getSharedBenchStore(b) + + testCases := []struct { + name string + count int // per set (total = 2x) + seedOffset int + }{ + {"1K_per_set", 1_000, 2_000_000}, + {"10K_per_set", 10_000, 3_000_000}, + {"100K_per_set", 100_000, 4_000_000}, + } + + for _, tc := range testCases { + tc := tc + b.Run(tc.name, func(b *testing.B) { + b.Logf("Creating %d txs for mark-true set + %d txs for mark-false set...", tc.count, tc.count) + + // Create two disjoint transaction sets (simulating winning vs losing chain) + markTrueHashes := createProductionLikeTransactions(b, ctx, store, tc.count, tc.seedOffset) + markFalseHashes := createProductionLikeTransactions(b, ctx, store, tc.count, tc.seedOffset+tc.count) + + // First mark them all as true so mark-false has something to flip + err := store.MarkTransactionsOnLongestChain(ctx, markTrueHashes, true) + require.NoError(b, err) + err = store.MarkTransactionsOnLongestChain(ctx, markFalseHashes, true) + require.NoError(b, err) + + runtime.GC() + + // --- Sequential baseline --- + b.Run("sequential", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + start := time.Now() + + err := store.MarkTransactionsOnLongestChain(ctx, markTrueHashes, true) + require.NoError(b, err) + + err = store.MarkTransactionsOnLongestChain(ctx, markFalseHashes, false) + require.NoError(b, err) + + elapsed := time.Since(start) + b.Logf(" Iter %d: %v (%.0f tx/sec total)", i+1, elapsed, + float64(tc.count*2)/elapsed.Seconds()) + } + b.StopTimer() + throughput := float64(tc.count*2*b.N) / b.Elapsed().Seconds() + b.ReportMetric(throughput, "tx/sec") + b.Logf("SEQUENTIAL: %v total, %.0f tx/sec", b.Elapsed(), throughput) + }) + + // Reset state for concurrent benchmark + err = store.MarkTransactionsOnLongestChain(ctx, markTrueHashes, true) + require.NoError(b, err) + err = store.MarkTransactionsOnLongestChain(ctx, markFalseHashes, true) + require.NoError(b, err) + + runtime.GC() + + // --- Concurrent (Phase 2 approach) --- + b.Run("concurrent", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + start := time.Now() + + g, gCtx := errgroup.WithContext(ctx) + g.Go(func() error { + return store.MarkTransactionsOnLongestChain(gCtx, markTrueHashes, true) + }) + g.Go(func() error { + return store.MarkTransactionsOnLongestChain(gCtx, markFalseHashes, false) + }) + err := g.Wait() + require.NoError(b, err) + + elapsed := time.Since(start) + b.Logf(" Iter %d: %v (%.0f tx/sec total)", i+1, elapsed, + float64(tc.count*2)/elapsed.Seconds()) + } + b.StopTimer() + throughput := float64(tc.count*2*b.N) / b.Elapsed().Seconds() + b.ReportMetric(throughput, "tx/sec") + b.Logf("CONCURRENT: %v total, %.0f tx/sec", b.Elapsed(), throughput) + }) + }) + } +} + // createRealisticTransaction creates a unique coinbase transaction for each seed. func createRealisticTransaction(seed int) *bt.Tx { coinbaseTxHex := "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff17032dff0c2f71646c6e6b2f5e931c7f7b6199adf35e1300ffffffff01d15fa012000000001976a91417db35d440a673a218e70a5b9d07f895facf50d288ac00000000"