From e8bd57c0e1f907f633199ce5628f16ce76b91fb8 Mon Sep 17 00:00:00 2001 From: freemans13 Date: Fri, 27 Feb 2026 10:17:30 +0000 Subject: [PATCH 1/3] feat: adaptive subtreeData skip during catchup when txs exist locally During catchup, subtree validation downloads massive subtreeData files from peers. When both nodes have received the same transactions (e.g. dev-scale with tx-blaster), the catching-up node already has all txs in its UTXO store. This optimization checks UTXO existence per-subtree before downloading subtreeData, skipping the download entirely when all txs exist locally. Uses a shared atomic flag so that once any subtree has missing txs, all remaining subtrees skip the probe and go straight to full subtreeData fetch, minimizing wasted bandwidth. Co-Authored-By: Claude Opus 4.6 --- pkg/utxocheck/existence.go | 59 +++++++ pkg/utxocheck/existence_test.go | 164 ++++++++++++++++++ services/blockvalidation/get_blocks.go | 107 +++++++++++- services/blockvalidation/get_blocks_test.go | 2 +- .../subtreevalidation/check_block_subtrees.go | 22 +++ .../subtreevalidation/streaming_processor.go | 110 +++++++++++- 6 files changed, 460 insertions(+), 4 deletions(-) create mode 100644 pkg/utxocheck/existence.go create mode 100644 pkg/utxocheck/existence_test.go diff --git a/pkg/utxocheck/existence.go b/pkg/utxocheck/existence.go new file mode 100644 index 0000000000..e1cec412f3 --- /dev/null +++ b/pkg/utxocheck/existence.go @@ -0,0 +1,59 @@ +package utxocheck + +import ( + "context" + + "github.com/bsv-blockchain/go-bt/v2/chainhash" + subtreepkg "github.com/bsv-blockchain/go-subtree" + "github.com/bsv-blockchain/teranode/stores/utxo" + "github.com/bsv-blockchain/teranode/stores/utxo/fields" +) + +// CheckAllTxsExistInUTXO batch-checks whether all transaction hashes exist in the UTXO store. +// It processes hashes in batches, skipping coinbase placeholders. +// Returns false immediately when any tx is missing or still in Creating state. +// Returns true only when ALL non-coinbase txs exist and are fully created. +// On UTXO store error, returns (false, err) so callers can fall back to subtreeData. +func CheckAllTxsExistInUTXO(ctx context.Context, utxoStore utxo.Store, txHashes []chainhash.Hash, batchSize int) (bool, error) { + if len(txHashes) == 0 { + return true, nil + } + + if batchSize <= 0 { + batchSize = 1000 + } + + for i := 0; i < len(txHashes); i += batchSize { + end := i + batchSize + if end > len(txHashes) { + end = len(txHashes) + } + + batch := make([]*utxo.UnresolvedMetaData, 0, end-i) + for j := i; j < end; j++ { + if txHashes[j].Equal(subtreepkg.CoinbasePlaceholderHashValue) { + continue + } + batch = append(batch, &utxo.UnresolvedMetaData{ + Hash: txHashes[j], + Idx: j, + }) + } + + if len(batch) == 0 { + continue + } + + if err := utxoStore.BatchDecorate(ctx, batch, fields.Creating); err != nil { + return false, err + } + + for _, item := range batch { + if item.Err != nil || item.Data == nil || item.Data.Creating { + return false, nil + } + } + } + + return true, nil +} diff --git a/pkg/utxocheck/existence_test.go b/pkg/utxocheck/existence_test.go new file mode 100644 index 0000000000..2d44391498 --- /dev/null +++ b/pkg/utxocheck/existence_test.go @@ -0,0 +1,164 @@ +package utxocheck + +import ( + "context" + "fmt" + "testing" + + "github.com/bsv-blockchain/go-bt/v2/chainhash" + subtreepkg "github.com/bsv-blockchain/go-subtree" + "github.com/bsv-blockchain/teranode/stores/utxo" + "github.com/bsv-blockchain/teranode/stores/utxo/meta" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestCheckAllTxsExistInUTXO(t *testing.T) { + ctx := context.Background() + + makeHash := func(i int) chainhash.Hash { + var h chainhash.Hash + copy(h[:], fmt.Sprintf("tx%032d", i)) + return h + } + + t.Run("empty hashes returns true", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, nil, 1000) + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("all txs exist returns true", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{makeHash(1), makeHash(2), makeHash(3)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + for _, item := range batch { + item.Data = &meta.Data{Creating: false} + } + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("missing tx returns false", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{makeHash(1), makeHash(2), makeHash(3)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + batch[0].Data = &meta.Data{Creating: false} + // batch[1] left nil — missing tx + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.False(t, result) + }) + + t.Run("creating tx returns false", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{makeHash(1)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + batch[0].Data = &meta.Data{Creating: true} + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.False(t, result) + }) + + t.Run("BatchDecorate error returns false with error", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{makeHash(1)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Return(fmt.Errorf("store error")) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.Error(t, err) + assert.False(t, result) + }) + + t.Run("coinbase placeholder is skipped", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{subtreepkg.CoinbasePlaceholderHashValue, makeHash(1)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + require.Len(t, batch, 1, "coinbase should be filtered out") + batch[0].Data = &meta.Data{Creating: false} + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("batching works correctly", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := make([]chainhash.Hash, 5) + for i := range hashes { + hashes[i] = makeHash(i) + } + + callCount := 0 + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + for _, item := range batch { + item.Data = &meta.Data{Creating: false} + } + callCount++ + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 2) + require.NoError(t, err) + assert.True(t, result) + assert.Equal(t, 3, callCount, "5 items with batch size 2 should create 3 batches") + }) + + t.Run("item error returns false", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{makeHash(1)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + batch[0].Err = fmt.Errorf("not found") + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.False(t, result) + }) + + t.Run("only coinbase hashes returns true without calling store", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{subtreepkg.CoinbasePlaceholderHashValue} + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.True(t, result) + // BatchDecorate should not be called since only coinbase hashes + mockStore.AssertNotCalled(t, "BatchDecorate") + }) + +} diff --git a/services/blockvalidation/get_blocks.go b/services/blockvalidation/get_blocks.go index f28f72aafe..da7673d7cc 100644 --- a/services/blockvalidation/get_blocks.go +++ b/services/blockvalidation/get_blocks.go @@ -14,6 +14,7 @@ import ( "github.com/bsv-blockchain/teranode/errors" "github.com/bsv-blockchain/teranode/model" "github.com/bsv-blockchain/teranode/pkg/fileformat" + "github.com/bsv-blockchain/teranode/pkg/utxocheck" "github.com/bsv-blockchain/teranode/stores/blob/options" "github.com/bsv-blockchain/teranode/util" "github.com/bsv-blockchain/teranode/util/tracing" @@ -206,8 +207,8 @@ func (u *Server) blockWorker(ctx context.Context, workerID int, workQueue <-chan return nil } - // Fetch subtree data for this block - err := u.fetchSubtreeDataForBlock(ctx, work.block, peerID, baseURL) + // Fetch subtree data for this block, adaptively skipping subtreeData when txs exist locally + err := u.fetchBlockSubtreesAdaptive(ctx, work.block, peerID, baseURL) if err != nil { // Send result (even if error occurred) result := resultItem{ @@ -374,6 +375,108 @@ func (u *Server) fetchSubtreeDataForBlock(gCtx context.Context, block *model.Blo return nil } +// fetchBlockSubtreesAdaptive processes subtrees with per-subtree adaptive logic. +// For each subtree it first fetches just the subtree hashes and checks UTXO existence. +// If all txs exist locally, the subtreeData download is skipped entirely. +// Once any subtree has missing txs, all remaining subtrees go straight to full fetch. +func (u *Server) fetchBlockSubtreesAdaptive(gCtx context.Context, block *model.Block, peerID, baseURL string) error { + ctx, _, deferFn := tracing.Tracer("blockvalidation").Start(gCtx, "fetchBlockSubtreesAdaptive", + tracing.WithParentStat(u.stats), + tracing.WithLogMessage(u.logger, "[catchup:fetchBlockSubtreesAdaptive][%s] adaptive subtree fetch for block with %d subtrees", block.Hash().String(), len(block.Subtrees)), + ) + defer deferFn() + + if len(block.Subtrees) == 0 { + u.logger.Debugf("[catchup:fetchBlockSubtreesAdaptive] Block %s has no subtrees, skipping", block.Hash().String()) + return nil + } + + g, ctx := errgroup.WithContext(ctx) + subtreeConcurrency := 8 + if u.settings.BlockValidation.SubtreeFetchConcurrency > 0 { + subtreeConcurrency = u.settings.BlockValidation.SubtreeFetchConcurrency + } + g.SetLimit(subtreeConcurrency) + + // Get peer assignments for subtrees if parallel fetching is enabled + var peerAssignments []*PeerForSubtreeFetch + if u.settings.BlockValidation.CatchupParallelFetchEnabled && u.p2pClient != nil { + var err error + peerAssignments, err = DistributeSubtreesAcrossPeers(ctx, u.logger, u.p2pClient, peerID, baseURL, len(block.Subtrees)) + if err != nil { + u.logger.Warnf("[catchup:fetchBlockSubtreesAdaptive][%s] Failed to distribute subtrees across peers: %v, using single peer", block.Hash().String(), err) + peerAssignments = nil + } + } + + // Shared flag: once set, all subsequent subtrees skip UTXO probing + var needsSubtreeData atomic.Bool + // If no UTXO store is available, skip the optimization entirely + if u.utxoStore == nil { + needsSubtreeData.Store(true) + } + + for i, subtreeHash := range block.Subtrees { + subtreeHashCopy := *subtreeHash + subtreeIndex := i + + fetchPeerID := peerID + fetchBaseURL := baseURL + if peerAssignments != nil && subtreeIndex < len(peerAssignments) { + assignment := peerAssignments[subtreeIndex] + fetchPeerID = assignment.PeerID + fetchBaseURL = assignment.BaseURL + } + + capturedPeerID := fetchPeerID + capturedBaseURL := fetchBaseURL + + g.Go(func() error { + // If flag is already set, go straight to full fetch + if needsSubtreeData.Load() { + return u.fetchAndStoreSubtreeAndSubtreeData(ctx, block, &subtreeHashCopy, capturedPeerID, capturedBaseURL) + } + + // Try subtree-only: fetch just the hashes and check UTXO existence + subtree, err := u.fetchAndStoreSubtree(ctx, block, &subtreeHashCopy, capturedPeerID, capturedBaseURL) + if err != nil { + // Can't get subtree hashes, fall back to full fetch + needsSubtreeData.Store(true) + return u.fetchAndStoreSubtreeAndSubtreeData(ctx, block, &subtreeHashCopy, capturedPeerID, capturedBaseURL) + } + + // Extract tx hashes from subtree nodes + txHashes := make([]chainhash.Hash, len(subtree.Nodes)) + for j, node := range subtree.Nodes { + txHashes[j] = node.Hash + } + + allExist, err := utxocheck.CheckAllTxsExistInUTXO(ctx, u.utxoStore, txHashes, 1000) + if err != nil { + u.logger.Warnf("[catchup:fetchBlockSubtreesAdaptive][%s] UTXO existence check failed for subtree %s: %v, falling back to subtreeData", block.Hash().String(), subtreeHashCopy.String(), err) + needsSubtreeData.Store(true) + return u.fetchAndStoreSubtreeData(ctx, block, &subtreeHashCopy, subtree, capturedPeerID, capturedBaseURL) + } + + if !allExist { + u.logger.Infof("[catchup:fetchBlockSubtreesAdaptive][%s] Missing txs found in subtree %s, switching to subtreeData mode", block.Hash().String(), subtreeHashCopy.String()) + needsSubtreeData.Store(true) + return u.fetchAndStoreSubtreeData(ctx, block, &subtreeHashCopy, subtree, capturedPeerID, capturedBaseURL) + } + + // All txs exist locally - no subtreeData needed for this subtree + u.logger.Infof("[catchup:fetchBlockSubtreesAdaptive][%s] All %d txs exist in UTXO store, skipping subtreeData download for subtree %s", block.Hash().String(), len(txHashes), subtreeHashCopy.String()) + return nil + }) + } + + if err := g.Wait(); err != nil { + return errors.NewServiceError("[catchup:fetchBlockSubtreesAdaptive] Failed to fetch subtrees for block %s", block.Hash().String(), err) + } + + return nil +} + // fetchAndStoreSubtree fetches and stores only the subtree (for subtreeToCheck) func (u *Server) fetchAndStoreSubtree(ctx context.Context, block *model.Block, subtreeHash *chainhash.Hash, peerID, baseURL string) (*subtreepkg.Subtree, error) { ctx, _, deferFn := tracing.Tracer("blockvalidation").Start(ctx, "fetchAndStoreSubtree", diff --git a/services/blockvalidation/get_blocks_test.go b/services/blockvalidation/get_blocks_test.go index d2ba976cd7..7de7c7ea5c 100644 --- a/services/blockvalidation/get_blocks_test.go +++ b/services/blockvalidation/get_blocks_test.go @@ -2592,7 +2592,7 @@ func TestBlockWorker(t *testing.T) { // Check result result := <-resultQueue assert.Error(t, result.err, "Should propagate subtree fetch error") - assert.Contains(t, result.err.Error(), "Failed to fetch subtree data for block") + assert.Contains(t, result.err.Error(), "Failed to fetch subtrees for block") }) t.Run("WorkerHandlesEmptyQueue", func(t *testing.T) { diff --git a/services/subtreevalidation/check_block_subtrees.go b/services/subtreevalidation/check_block_subtrees.go index 6f2e76805b..ff860586d8 100644 --- a/services/subtreevalidation/check_block_subtrees.go +++ b/services/subtreevalidation/check_block_subtrees.go @@ -14,6 +14,7 @@ import ( "github.com/bsv-blockchain/teranode/errors" "github.com/bsv-blockchain/teranode/model" "github.com/bsv-blockchain/teranode/pkg/fileformat" + "github.com/bsv-blockchain/teranode/pkg/utxocheck" "github.com/bsv-blockchain/teranode/services/blockchain" "github.com/bsv-blockchain/teranode/services/subtreevalidation/subtreevalidation_api" "github.com/bsv-blockchain/teranode/services/validator" @@ -149,6 +150,8 @@ func (u *Server) CheckBlockSubtrees(ctx context.Context, request *subtreevalidat // Load transactions for this batch of subtrees in parallel subtreeTxs := make([][]*bt.Tx, len(batchSubtrees)) + // Adaptive flag: once any subtree has missing txs, skip UTXO check for remaining subtrees + var needsSubtreeData atomic.Bool g, gCtx := errgroup.WithContext(ctx) util.SafeSetLimit(g, u.settings.SubtreeValidation.CheckBlockSubtreesConcurrency) @@ -236,6 +239,25 @@ func (u *Server) CheckBlockSubtrees(ctx context.Context, request *subtreevalidat } } + // Adaptive skip: check if all txs exist in UTXO store before downloading subtreeData + if !needsSubtreeData.Load() { + txHashes := make([]chainhash.Hash, len(subtreeToCheck.Nodes)) + for j, node := range subtreeToCheck.Nodes { + txHashes[j] = node.Hash + } + allExist, checkErr := utxocheck.CheckAllTxsExistInUTXO(gCtx, u.utxoStore, txHashes, 1000) + if checkErr != nil { + u.logger.Warnf("[CheckBlockSubtrees][%s] UTXO existence check failed: %v, falling back to subtreeData", subtreeHash.String(), checkErr) + needsSubtreeData.Store(true) + } else if allExist { + u.logger.Infof("[CheckBlockSubtrees][%s] All %d txs exist in UTXO store, skipping subtreeData download", subtreeHash.String(), len(txHashes)) + return nil + } else { + u.logger.Infof("[CheckBlockSubtrees][%s] Missing txs found, switching to subtreeData mode for block %s", subtreeHash.String(), block.Hash().String()) + needsSubtreeData.Store(true) + } + } + // PHASE 2: Exact pre-allocation subtreeTxs[subtreeIdx] = make([]*bt.Tx, 0, subtreeToCheck.Length()) diff --git a/services/subtreevalidation/streaming_processor.go b/services/subtreevalidation/streaming_processor.go index c0d03cfae0..eb4492f134 100644 --- a/services/subtreevalidation/streaming_processor.go +++ b/services/subtreevalidation/streaming_processor.go @@ -16,6 +16,7 @@ import ( "github.com/bsv-blockchain/teranode/errors" "github.com/bsv-blockchain/teranode/model" "github.com/bsv-blockchain/teranode/pkg/fileformat" + "github.com/bsv-blockchain/teranode/pkg/utxocheck" "github.com/bsv-blockchain/teranode/services/blockchain" "github.com/bsv-blockchain/teranode/services/subtreevalidation/subtreevalidation_api" "github.com/bsv-blockchain/teranode/services/validator" @@ -260,6 +261,90 @@ func (u *Server) processMissingSubtreesStreaming(ctx context.Context, request *s return blockIds, nil } +// loadSubtreeOnly loads/fetches just the subtree structure (tx hashes) without downloading subtreeData. +// Returns the tx hashes extracted from the subtree nodes. +func (u *Server) loadSubtreeOnly(ctx context.Context, request *subtreevalidation_api.CheckBlockSubtreesRequest, subtreeHash chainhash.Hash, peerID string, dah uint32) ([]chainhash.Hash, error) { + subtreeToCheckExists, err := u.subtreeStore.Exists(ctx, subtreeHash[:], fileformat.FileTypeSubtreeToCheck) + if err != nil { + return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to check if subtree exists", subtreeHash.String(), err) + } + + var subtreeToCheck *subtreepkg.Subtree + + if subtreeToCheckExists { + subtreeReader, err := u.subtreeStore.GetIoReader(ctx, subtreeHash[:], fileformat.FileTypeSubtreeToCheck) + if err != nil { + return nil, errors.NewStorageError("[loadSubtreeOnly][%s] failed to get subtree from store", subtreeHash.String(), err) + } + defer subtreeReader.Close() + + bufferedReader := bufioReaderPool.Get().(*bufio.Reader) + bufferedReader.Reset(subtreeReader) + defer func() { + bufferedReader.Reset(nil) + bufioReaderPool.Put(bufferedReader) + }() + + subtreeToCheck, err = subtreepkg.NewSubtreeFromReader(bufferedReader) + if err != nil { + return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to deserialize subtree", subtreeHash.String(), err) + } + } else { + url := fmt.Sprintf("%s/subtree/%s", request.BaseUrl, subtreeHash.String()) + + subtreeNodeBytes, err := util.DoHTTPRequest(ctx, url) + if err != nil { + return nil, errors.NewServiceError("[loadSubtreeOnly][%s] failed to get subtree from %s", subtreeHash.String(), url, err) + } + + if u.p2pClient != nil && peerID != "" { + if err := u.p2pClient.RecordBytesDownloaded(ctx, peerID, uint64(len(subtreeNodeBytes))); err != nil { + u.logger.Warnf("[loadSubtreeOnly][%s] failed to record bytes downloaded: %v", subtreeHash.String(), err) + } + } + + subtreeToCheck, err = subtreepkg.NewIncompleteTreeByLeafCount(len(subtreeNodeBytes) / chainhash.HashSize) + if err != nil { + return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to create subtree structure", subtreeHash.String(), err) + } + + var nodeHash chainhash.Hash + for i := 0; i < len(subtreeNodeBytes)/chainhash.HashSize; i++ { + copy(nodeHash[:], subtreeNodeBytes[i*chainhash.HashSize:(i+1)*chainhash.HashSize]) + + if nodeHash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { + if err = subtreeToCheck.AddCoinbaseNode(); err != nil { + return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to add coinbase node", subtreeHash.String(), err) + } + } else { + if err = subtreeToCheck.AddNode(nodeHash, 0, 0); err != nil { + return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to add node", subtreeHash.String(), err) + } + } + } + + if !subtreeHash.Equal(*subtreeToCheck.RootHash()) { + return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] subtree root hash mismatch: %s", subtreeHash.String(), subtreeToCheck.RootHash().String()) + } + + subtreeBytes, err := subtreeToCheck.Serialize() + if err != nil { + return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to serialize subtree", subtreeHash.String(), err) + } + + if err = u.subtreeStore.Set(ctx, subtreeHash[:], fileformat.FileTypeSubtreeToCheck, subtreeBytes, options.WithDeleteAt(dah), options.WithAllowOverwrite(true)); err != nil { + return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to store subtree", subtreeHash.String(), err) + } + } + + txHashes := make([]chainhash.Hash, len(subtreeToCheck.Nodes)) + for j, node := range subtreeToCheck.Nodes { + txHashes[j] = node.Hash + } + + return txHashes, nil +} + // loadSubtreeTransactions loads transactions from a single subtree. // This is extracted from the original processMissingSubtrees logic. func (u *Server) loadSubtreeTransactions(ctx context.Context, request *subtreevalidation_api.CheckBlockSubtreesRequest, subtreeHash chainhash.Hash, peerID string, dah uint32) ([]*bt.Tx, error) { @@ -722,8 +807,31 @@ func (sp *streamingProcessor) streamAndFilterSubtrees( dah uint32, unvalidatedTxChan chan<- *bt.Tx, ) error { + needsSubtreeData := false + for i, subtreeHash := range missingSubtrees { - // Load subtree transactions + // Adaptive skip: try subtree-only path if no missing txs found yet + if !needsSubtreeData { + txHashes, loadErr := sp.server.loadSubtreeOnly(ctx, request, subtreeHash, peerID, dah) + if loadErr != nil { + sp.server.logger.Warnf("[streamAndFilterSubtrees] Failed to load subtree-only for %s: %v, falling back to full path", subtreeHash.String(), loadErr) + needsSubtreeData = true + } else { + allExist, checkErr := utxocheck.CheckAllTxsExistInUTXO(ctx, sp.server.utxoStore, txHashes, 1000) + if checkErr != nil { + sp.server.logger.Warnf("[streamAndFilterSubtrees] UTXO existence check failed for %s: %v, falling back to subtreeData", subtreeHash.String(), checkErr) + needsSubtreeData = true + } else if allExist { + sp.server.logger.Infof("[streamAndFilterSubtrees] Subtree %d/%d (%s): all %d txs exist in UTXO store, skipping subtreeData", i+1, len(missingSubtrees), subtreeHash.String(), len(txHashes)) + continue + } else { + sp.server.logger.Infof("[streamAndFilterSubtrees] Subtree %d/%d (%s): missing txs found, switching to subtreeData mode", i+1, len(missingSubtrees), subtreeHash.String()) + needsSubtreeData = true + } + } + } + + // Load subtree transactions (full path with subtreeData) txs, err := sp.server.loadSubtreeTransactions(ctx, request, subtreeHash, peerID, dah) if err != nil { return err From c1ce725e691467be2ed1646f474f078460fcb429 Mon Sep 17 00:00:00 2001 From: freemans13 Date: Fri, 27 Feb 2026 10:23:06 +0000 Subject: [PATCH 2/3] fix: re-check needsSubtreeData flag to close race window Add a second flag check after the subtree-hash fetch completes but before starting the expensive UTXO BatchDecorate call. This closes the window where a goroutine passes the initial check, another goroutine sets the flag while the first is fetching hashes, and the first goroutine then does an unnecessary UTXO existence check. Co-Authored-By: Claude Opus 4.6 --- services/blockvalidation/get_blocks.go | 5 ++++ .../subtreevalidation/check_block_subtrees.go | 25 ++++++++++++------- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/services/blockvalidation/get_blocks.go b/services/blockvalidation/get_blocks.go index da7673d7cc..8f905763f8 100644 --- a/services/blockvalidation/get_blocks.go +++ b/services/blockvalidation/get_blocks.go @@ -445,6 +445,11 @@ func (u *Server) fetchBlockSubtreesAdaptive(gCtx context.Context, block *model.B return u.fetchAndStoreSubtreeAndSubtreeData(ctx, block, &subtreeHashCopy, capturedPeerID, capturedBaseURL) } + // Re-check flag: another goroutine may have set it while we were fetching hashes + if needsSubtreeData.Load() { + return u.fetchAndStoreSubtreeData(ctx, block, &subtreeHashCopy, subtree, capturedPeerID, capturedBaseURL) + } + // Extract tx hashes from subtree nodes txHashes := make([]chainhash.Hash, len(subtree.Nodes)) for j, node := range subtree.Nodes { diff --git a/services/subtreevalidation/check_block_subtrees.go b/services/subtreevalidation/check_block_subtrees.go index ff860586d8..43597bfbed 100644 --- a/services/subtreevalidation/check_block_subtrees.go +++ b/services/subtreevalidation/check_block_subtrees.go @@ -245,16 +245,23 @@ func (u *Server) CheckBlockSubtrees(ctx context.Context, request *subtreevalidat for j, node := range subtreeToCheck.Nodes { txHashes[j] = node.Hash } - allExist, checkErr := utxocheck.CheckAllTxsExistInUTXO(gCtx, u.utxoStore, txHashes, 1000) - if checkErr != nil { - u.logger.Warnf("[CheckBlockSubtrees][%s] UTXO existence check failed: %v, falling back to subtreeData", subtreeHash.String(), checkErr) - needsSubtreeData.Store(true) - } else if allExist { - u.logger.Infof("[CheckBlockSubtrees][%s] All %d txs exist in UTXO store, skipping subtreeData download", subtreeHash.String(), len(txHashes)) - return nil + + // Re-check flag before the expensive UTXO call — another goroutine may have set it + // while we were loading the subtreeToCheck above + if needsSubtreeData.Load() { + u.logger.Debugf("[CheckBlockSubtrees][%s] Flag set by another goroutine, skipping UTXO check", subtreeHash.String()) } else { - u.logger.Infof("[CheckBlockSubtrees][%s] Missing txs found, switching to subtreeData mode for block %s", subtreeHash.String(), block.Hash().String()) - needsSubtreeData.Store(true) + allExist, checkErr := utxocheck.CheckAllTxsExistInUTXO(gCtx, u.utxoStore, txHashes, 1000) + if checkErr != nil { + u.logger.Warnf("[CheckBlockSubtrees][%s] UTXO existence check failed: %v, falling back to subtreeData", subtreeHash.String(), checkErr) + needsSubtreeData.Store(true) + } else if allExist { + u.logger.Infof("[CheckBlockSubtrees][%s] All %d txs exist in UTXO store, skipping subtreeData download", subtreeHash.String(), len(txHashes)) + return nil + } else { + u.logger.Infof("[CheckBlockSubtrees][%s] Missing txs found, switching to subtreeData mode for block %s", subtreeHash.String(), block.Hash().String()) + needsSubtreeData.Store(true) + } } } From e25b03d45eb5c6e6955551313745dc51b2e63cce Mon Sep 17 00:00:00 2001 From: freemans13 Date: Fri, 27 Feb 2026 10:59:40 +0000 Subject: [PATCH 3/3] fix: remove UTXO existence checks from validation paths The UTXO check in check_block_subtrees.go and streaming_processor.go caused validation to be skipped entirely when all txs existed in the UTXO store. This broke tests using the SQL backend (which doesn't store the Creating field) and would also skip block-level validation in production. The optimization is kept only in fetchBlockSubtreesAdaptive (pre-fetch phase), which correctly skips subtreeData downloads while letting the validation pipeline run fully. Co-Authored-By: Claude Opus 4.6 --- .../subtreevalidation/check_block_subtrees.go | 29 ----- .../subtreevalidation/streaming_processor.go | 110 +----------------- 2 files changed, 1 insertion(+), 138 deletions(-) diff --git a/services/subtreevalidation/check_block_subtrees.go b/services/subtreevalidation/check_block_subtrees.go index 43597bfbed..6f2e76805b 100644 --- a/services/subtreevalidation/check_block_subtrees.go +++ b/services/subtreevalidation/check_block_subtrees.go @@ -14,7 +14,6 @@ import ( "github.com/bsv-blockchain/teranode/errors" "github.com/bsv-blockchain/teranode/model" "github.com/bsv-blockchain/teranode/pkg/fileformat" - "github.com/bsv-blockchain/teranode/pkg/utxocheck" "github.com/bsv-blockchain/teranode/services/blockchain" "github.com/bsv-blockchain/teranode/services/subtreevalidation/subtreevalidation_api" "github.com/bsv-blockchain/teranode/services/validator" @@ -150,8 +149,6 @@ func (u *Server) CheckBlockSubtrees(ctx context.Context, request *subtreevalidat // Load transactions for this batch of subtrees in parallel subtreeTxs := make([][]*bt.Tx, len(batchSubtrees)) - // Adaptive flag: once any subtree has missing txs, skip UTXO check for remaining subtrees - var needsSubtreeData atomic.Bool g, gCtx := errgroup.WithContext(ctx) util.SafeSetLimit(g, u.settings.SubtreeValidation.CheckBlockSubtreesConcurrency) @@ -239,32 +236,6 @@ func (u *Server) CheckBlockSubtrees(ctx context.Context, request *subtreevalidat } } - // Adaptive skip: check if all txs exist in UTXO store before downloading subtreeData - if !needsSubtreeData.Load() { - txHashes := make([]chainhash.Hash, len(subtreeToCheck.Nodes)) - for j, node := range subtreeToCheck.Nodes { - txHashes[j] = node.Hash - } - - // Re-check flag before the expensive UTXO call — another goroutine may have set it - // while we were loading the subtreeToCheck above - if needsSubtreeData.Load() { - u.logger.Debugf("[CheckBlockSubtrees][%s] Flag set by another goroutine, skipping UTXO check", subtreeHash.String()) - } else { - allExist, checkErr := utxocheck.CheckAllTxsExistInUTXO(gCtx, u.utxoStore, txHashes, 1000) - if checkErr != nil { - u.logger.Warnf("[CheckBlockSubtrees][%s] UTXO existence check failed: %v, falling back to subtreeData", subtreeHash.String(), checkErr) - needsSubtreeData.Store(true) - } else if allExist { - u.logger.Infof("[CheckBlockSubtrees][%s] All %d txs exist in UTXO store, skipping subtreeData download", subtreeHash.String(), len(txHashes)) - return nil - } else { - u.logger.Infof("[CheckBlockSubtrees][%s] Missing txs found, switching to subtreeData mode for block %s", subtreeHash.String(), block.Hash().String()) - needsSubtreeData.Store(true) - } - } - } - // PHASE 2: Exact pre-allocation subtreeTxs[subtreeIdx] = make([]*bt.Tx, 0, subtreeToCheck.Length()) diff --git a/services/subtreevalidation/streaming_processor.go b/services/subtreevalidation/streaming_processor.go index eb4492f134..c0d03cfae0 100644 --- a/services/subtreevalidation/streaming_processor.go +++ b/services/subtreevalidation/streaming_processor.go @@ -16,7 +16,6 @@ import ( "github.com/bsv-blockchain/teranode/errors" "github.com/bsv-blockchain/teranode/model" "github.com/bsv-blockchain/teranode/pkg/fileformat" - "github.com/bsv-blockchain/teranode/pkg/utxocheck" "github.com/bsv-blockchain/teranode/services/blockchain" "github.com/bsv-blockchain/teranode/services/subtreevalidation/subtreevalidation_api" "github.com/bsv-blockchain/teranode/services/validator" @@ -261,90 +260,6 @@ func (u *Server) processMissingSubtreesStreaming(ctx context.Context, request *s return blockIds, nil } -// loadSubtreeOnly loads/fetches just the subtree structure (tx hashes) without downloading subtreeData. -// Returns the tx hashes extracted from the subtree nodes. -func (u *Server) loadSubtreeOnly(ctx context.Context, request *subtreevalidation_api.CheckBlockSubtreesRequest, subtreeHash chainhash.Hash, peerID string, dah uint32) ([]chainhash.Hash, error) { - subtreeToCheckExists, err := u.subtreeStore.Exists(ctx, subtreeHash[:], fileformat.FileTypeSubtreeToCheck) - if err != nil { - return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to check if subtree exists", subtreeHash.String(), err) - } - - var subtreeToCheck *subtreepkg.Subtree - - if subtreeToCheckExists { - subtreeReader, err := u.subtreeStore.GetIoReader(ctx, subtreeHash[:], fileformat.FileTypeSubtreeToCheck) - if err != nil { - return nil, errors.NewStorageError("[loadSubtreeOnly][%s] failed to get subtree from store", subtreeHash.String(), err) - } - defer subtreeReader.Close() - - bufferedReader := bufioReaderPool.Get().(*bufio.Reader) - bufferedReader.Reset(subtreeReader) - defer func() { - bufferedReader.Reset(nil) - bufioReaderPool.Put(bufferedReader) - }() - - subtreeToCheck, err = subtreepkg.NewSubtreeFromReader(bufferedReader) - if err != nil { - return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to deserialize subtree", subtreeHash.String(), err) - } - } else { - url := fmt.Sprintf("%s/subtree/%s", request.BaseUrl, subtreeHash.String()) - - subtreeNodeBytes, err := util.DoHTTPRequest(ctx, url) - if err != nil { - return nil, errors.NewServiceError("[loadSubtreeOnly][%s] failed to get subtree from %s", subtreeHash.String(), url, err) - } - - if u.p2pClient != nil && peerID != "" { - if err := u.p2pClient.RecordBytesDownloaded(ctx, peerID, uint64(len(subtreeNodeBytes))); err != nil { - u.logger.Warnf("[loadSubtreeOnly][%s] failed to record bytes downloaded: %v", subtreeHash.String(), err) - } - } - - subtreeToCheck, err = subtreepkg.NewIncompleteTreeByLeafCount(len(subtreeNodeBytes) / chainhash.HashSize) - if err != nil { - return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to create subtree structure", subtreeHash.String(), err) - } - - var nodeHash chainhash.Hash - for i := 0; i < len(subtreeNodeBytes)/chainhash.HashSize; i++ { - copy(nodeHash[:], subtreeNodeBytes[i*chainhash.HashSize:(i+1)*chainhash.HashSize]) - - if nodeHash.Equal(subtreepkg.CoinbasePlaceholderHashValue) { - if err = subtreeToCheck.AddCoinbaseNode(); err != nil { - return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to add coinbase node", subtreeHash.String(), err) - } - } else { - if err = subtreeToCheck.AddNode(nodeHash, 0, 0); err != nil { - return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to add node", subtreeHash.String(), err) - } - } - } - - if !subtreeHash.Equal(*subtreeToCheck.RootHash()) { - return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] subtree root hash mismatch: %s", subtreeHash.String(), subtreeToCheck.RootHash().String()) - } - - subtreeBytes, err := subtreeToCheck.Serialize() - if err != nil { - return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to serialize subtree", subtreeHash.String(), err) - } - - if err = u.subtreeStore.Set(ctx, subtreeHash[:], fileformat.FileTypeSubtreeToCheck, subtreeBytes, options.WithDeleteAt(dah), options.WithAllowOverwrite(true)); err != nil { - return nil, errors.NewProcessingError("[loadSubtreeOnly][%s] failed to store subtree", subtreeHash.String(), err) - } - } - - txHashes := make([]chainhash.Hash, len(subtreeToCheck.Nodes)) - for j, node := range subtreeToCheck.Nodes { - txHashes[j] = node.Hash - } - - return txHashes, nil -} - // loadSubtreeTransactions loads transactions from a single subtree. // This is extracted from the original processMissingSubtrees logic. func (u *Server) loadSubtreeTransactions(ctx context.Context, request *subtreevalidation_api.CheckBlockSubtreesRequest, subtreeHash chainhash.Hash, peerID string, dah uint32) ([]*bt.Tx, error) { @@ -807,31 +722,8 @@ func (sp *streamingProcessor) streamAndFilterSubtrees( dah uint32, unvalidatedTxChan chan<- *bt.Tx, ) error { - needsSubtreeData := false - for i, subtreeHash := range missingSubtrees { - // Adaptive skip: try subtree-only path if no missing txs found yet - if !needsSubtreeData { - txHashes, loadErr := sp.server.loadSubtreeOnly(ctx, request, subtreeHash, peerID, dah) - if loadErr != nil { - sp.server.logger.Warnf("[streamAndFilterSubtrees] Failed to load subtree-only for %s: %v, falling back to full path", subtreeHash.String(), loadErr) - needsSubtreeData = true - } else { - allExist, checkErr := utxocheck.CheckAllTxsExistInUTXO(ctx, sp.server.utxoStore, txHashes, 1000) - if checkErr != nil { - sp.server.logger.Warnf("[streamAndFilterSubtrees] UTXO existence check failed for %s: %v, falling back to subtreeData", subtreeHash.String(), checkErr) - needsSubtreeData = true - } else if allExist { - sp.server.logger.Infof("[streamAndFilterSubtrees] Subtree %d/%d (%s): all %d txs exist in UTXO store, skipping subtreeData", i+1, len(missingSubtrees), subtreeHash.String(), len(txHashes)) - continue - } else { - sp.server.logger.Infof("[streamAndFilterSubtrees] Subtree %d/%d (%s): missing txs found, switching to subtreeData mode", i+1, len(missingSubtrees), subtreeHash.String()) - needsSubtreeData = true - } - } - } - - // Load subtree transactions (full path with subtreeData) + // Load subtree transactions txs, err := sp.server.loadSubtreeTransactions(ctx, request, subtreeHash, peerID, dah) if err != nil { return err