diff --git a/pkg/utxocheck/existence.go b/pkg/utxocheck/existence.go new file mode 100644 index 0000000000..e1cec412f3 --- /dev/null +++ b/pkg/utxocheck/existence.go @@ -0,0 +1,59 @@ +package utxocheck + +import ( + "context" + + "github.com/bsv-blockchain/go-bt/v2/chainhash" + subtreepkg "github.com/bsv-blockchain/go-subtree" + "github.com/bsv-blockchain/teranode/stores/utxo" + "github.com/bsv-blockchain/teranode/stores/utxo/fields" +) + +// CheckAllTxsExistInUTXO batch-checks whether all transaction hashes exist in the UTXO store. +// It processes hashes in batches, skipping coinbase placeholders. +// Returns false immediately when any tx is missing or still in Creating state. +// Returns true only when ALL non-coinbase txs exist and are fully created. +// On UTXO store error, returns (false, err) so callers can fall back to subtreeData. +func CheckAllTxsExistInUTXO(ctx context.Context, utxoStore utxo.Store, txHashes []chainhash.Hash, batchSize int) (bool, error) { + if len(txHashes) == 0 { + return true, nil + } + + if batchSize <= 0 { + batchSize = 1000 + } + + for i := 0; i < len(txHashes); i += batchSize { + end := i + batchSize + if end > len(txHashes) { + end = len(txHashes) + } + + batch := make([]*utxo.UnresolvedMetaData, 0, end-i) + for j := i; j < end; j++ { + if txHashes[j].Equal(subtreepkg.CoinbasePlaceholderHashValue) { + continue + } + batch = append(batch, &utxo.UnresolvedMetaData{ + Hash: txHashes[j], + Idx: j, + }) + } + + if len(batch) == 0 { + continue + } + + if err := utxoStore.BatchDecorate(ctx, batch, fields.Creating); err != nil { + return false, err + } + + for _, item := range batch { + if item.Err != nil || item.Data == nil || item.Data.Creating { + return false, nil + } + } + } + + return true, nil +} diff --git a/pkg/utxocheck/existence_test.go b/pkg/utxocheck/existence_test.go new file mode 100644 index 0000000000..2d44391498 --- /dev/null +++ b/pkg/utxocheck/existence_test.go @@ -0,0 +1,164 @@ +package utxocheck + +import ( + "context" + "fmt" + "testing" + + "github.com/bsv-blockchain/go-bt/v2/chainhash" + subtreepkg "github.com/bsv-blockchain/go-subtree" + "github.com/bsv-blockchain/teranode/stores/utxo" + "github.com/bsv-blockchain/teranode/stores/utxo/meta" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestCheckAllTxsExistInUTXO(t *testing.T) { + ctx := context.Background() + + makeHash := func(i int) chainhash.Hash { + var h chainhash.Hash + copy(h[:], fmt.Sprintf("tx%032d", i)) + return h + } + + t.Run("empty hashes returns true", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, nil, 1000) + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("all txs exist returns true", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{makeHash(1), makeHash(2), makeHash(3)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + for _, item := range batch { + item.Data = &meta.Data{Creating: false} + } + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("missing tx returns false", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{makeHash(1), makeHash(2), makeHash(3)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + batch[0].Data = &meta.Data{Creating: false} + // batch[1] left nil — missing tx + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.False(t, result) + }) + + t.Run("creating tx returns false", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{makeHash(1)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + batch[0].Data = &meta.Data{Creating: true} + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.False(t, result) + }) + + t.Run("BatchDecorate error returns false with error", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{makeHash(1)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Return(fmt.Errorf("store error")) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.Error(t, err) + assert.False(t, result) + }) + + t.Run("coinbase placeholder is skipped", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{subtreepkg.CoinbasePlaceholderHashValue, makeHash(1)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + require.Len(t, batch, 1, "coinbase should be filtered out") + batch[0].Data = &meta.Data{Creating: false} + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.True(t, result) + }) + + t.Run("batching works correctly", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := make([]chainhash.Hash, 5) + for i := range hashes { + hashes[i] = makeHash(i) + } + + callCount := 0 + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + for _, item := range batch { + item.Data = &meta.Data{Creating: false} + } + callCount++ + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 2) + require.NoError(t, err) + assert.True(t, result) + assert.Equal(t, 3, callCount, "5 items with batch size 2 should create 3 batches") + }) + + t.Run("item error returns false", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{makeHash(1)} + + mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + batch := args.Get(1).([]*utxo.UnresolvedMetaData) + batch[0].Err = fmt.Errorf("not found") + }). + Return(nil) + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.False(t, result) + }) + + t.Run("only coinbase hashes returns true without calling store", func(t *testing.T) { + mockStore := &utxo.MockUtxostore{} + hashes := []chainhash.Hash{subtreepkg.CoinbasePlaceholderHashValue} + + result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000) + require.NoError(t, err) + assert.True(t, result) + // BatchDecorate should not be called since only coinbase hashes + mockStore.AssertNotCalled(t, "BatchDecorate") + }) + +} diff --git a/services/blockvalidation/get_blocks.go b/services/blockvalidation/get_blocks.go index f28f72aafe..8f905763f8 100644 --- a/services/blockvalidation/get_blocks.go +++ b/services/blockvalidation/get_blocks.go @@ -14,6 +14,7 @@ import ( "github.com/bsv-blockchain/teranode/errors" "github.com/bsv-blockchain/teranode/model" "github.com/bsv-blockchain/teranode/pkg/fileformat" + "github.com/bsv-blockchain/teranode/pkg/utxocheck" "github.com/bsv-blockchain/teranode/stores/blob/options" "github.com/bsv-blockchain/teranode/util" "github.com/bsv-blockchain/teranode/util/tracing" @@ -206,8 +207,8 @@ func (u *Server) blockWorker(ctx context.Context, workerID int, workQueue <-chan return nil } - // Fetch subtree data for this block - err := u.fetchSubtreeDataForBlock(ctx, work.block, peerID, baseURL) + // Fetch subtree data for this block, adaptively skipping subtreeData when txs exist locally + err := u.fetchBlockSubtreesAdaptive(ctx, work.block, peerID, baseURL) if err != nil { // Send result (even if error occurred) result := resultItem{ @@ -374,6 +375,113 @@ func (u *Server) fetchSubtreeDataForBlock(gCtx context.Context, block *model.Blo return nil } +// fetchBlockSubtreesAdaptive processes subtrees with per-subtree adaptive logic. +// For each subtree it first fetches just the subtree hashes and checks UTXO existence. +// If all txs exist locally, the subtreeData download is skipped entirely. +// Once any subtree has missing txs, all remaining subtrees go straight to full fetch. +func (u *Server) fetchBlockSubtreesAdaptive(gCtx context.Context, block *model.Block, peerID, baseURL string) error { + ctx, _, deferFn := tracing.Tracer("blockvalidation").Start(gCtx, "fetchBlockSubtreesAdaptive", + tracing.WithParentStat(u.stats), + tracing.WithLogMessage(u.logger, "[catchup:fetchBlockSubtreesAdaptive][%s] adaptive subtree fetch for block with %d subtrees", block.Hash().String(), len(block.Subtrees)), + ) + defer deferFn() + + if len(block.Subtrees) == 0 { + u.logger.Debugf("[catchup:fetchBlockSubtreesAdaptive] Block %s has no subtrees, skipping", block.Hash().String()) + return nil + } + + g, ctx := errgroup.WithContext(ctx) + subtreeConcurrency := 8 + if u.settings.BlockValidation.SubtreeFetchConcurrency > 0 { + subtreeConcurrency = u.settings.BlockValidation.SubtreeFetchConcurrency + } + g.SetLimit(subtreeConcurrency) + + // Get peer assignments for subtrees if parallel fetching is enabled + var peerAssignments []*PeerForSubtreeFetch + if u.settings.BlockValidation.CatchupParallelFetchEnabled && u.p2pClient != nil { + var err error + peerAssignments, err = DistributeSubtreesAcrossPeers(ctx, u.logger, u.p2pClient, peerID, baseURL, len(block.Subtrees)) + if err != nil { + u.logger.Warnf("[catchup:fetchBlockSubtreesAdaptive][%s] Failed to distribute subtrees across peers: %v, using single peer", block.Hash().String(), err) + peerAssignments = nil + } + } + + // Shared flag: once set, all subsequent subtrees skip UTXO probing + var needsSubtreeData atomic.Bool + // If no UTXO store is available, skip the optimization entirely + if u.utxoStore == nil { + needsSubtreeData.Store(true) + } + + for i, subtreeHash := range block.Subtrees { + subtreeHashCopy := *subtreeHash + subtreeIndex := i + + fetchPeerID := peerID + fetchBaseURL := baseURL + if peerAssignments != nil && subtreeIndex < len(peerAssignments) { + assignment := peerAssignments[subtreeIndex] + fetchPeerID = assignment.PeerID + fetchBaseURL = assignment.BaseURL + } + + capturedPeerID := fetchPeerID + capturedBaseURL := fetchBaseURL + + g.Go(func() error { + // If flag is already set, go straight to full fetch + if needsSubtreeData.Load() { + return u.fetchAndStoreSubtreeAndSubtreeData(ctx, block, &subtreeHashCopy, capturedPeerID, capturedBaseURL) + } + + // Try subtree-only: fetch just the hashes and check UTXO existence + subtree, err := u.fetchAndStoreSubtree(ctx, block, &subtreeHashCopy, capturedPeerID, capturedBaseURL) + if err != nil { + // Can't get subtree hashes, fall back to full fetch + needsSubtreeData.Store(true) + return u.fetchAndStoreSubtreeAndSubtreeData(ctx, block, &subtreeHashCopy, capturedPeerID, capturedBaseURL) + } + + // Re-check flag: another goroutine may have set it while we were fetching hashes + if needsSubtreeData.Load() { + return u.fetchAndStoreSubtreeData(ctx, block, &subtreeHashCopy, subtree, capturedPeerID, capturedBaseURL) + } + + // Extract tx hashes from subtree nodes + txHashes := make([]chainhash.Hash, len(subtree.Nodes)) + for j, node := range subtree.Nodes { + txHashes[j] = node.Hash + } + + allExist, err := utxocheck.CheckAllTxsExistInUTXO(ctx, u.utxoStore, txHashes, 1000) + if err != nil { + u.logger.Warnf("[catchup:fetchBlockSubtreesAdaptive][%s] UTXO existence check failed for subtree %s: %v, falling back to subtreeData", block.Hash().String(), subtreeHashCopy.String(), err) + needsSubtreeData.Store(true) + return u.fetchAndStoreSubtreeData(ctx, block, &subtreeHashCopy, subtree, capturedPeerID, capturedBaseURL) + } + + if !allExist { + u.logger.Infof("[catchup:fetchBlockSubtreesAdaptive][%s] Missing txs found in subtree %s, switching to subtreeData mode", block.Hash().String(), subtreeHashCopy.String()) + needsSubtreeData.Store(true) + return u.fetchAndStoreSubtreeData(ctx, block, &subtreeHashCopy, subtree, capturedPeerID, capturedBaseURL) + } + + // All txs exist locally - no subtreeData needed for this subtree + u.logger.Infof("[catchup:fetchBlockSubtreesAdaptive][%s] All %d txs exist in UTXO store, skipping subtreeData download for subtree %s", block.Hash().String(), len(txHashes), subtreeHashCopy.String()) + return nil + }) + } + + if err := g.Wait(); err != nil { + return errors.NewServiceError("[catchup:fetchBlockSubtreesAdaptive] Failed to fetch subtrees for block %s", block.Hash().String(), err) + } + + return nil +} + // fetchAndStoreSubtree fetches and stores only the subtree (for subtreeToCheck) func (u *Server) fetchAndStoreSubtree(ctx context.Context, block *model.Block, subtreeHash *chainhash.Hash, peerID, baseURL string) (*subtreepkg.Subtree, error) { ctx, _, deferFn := tracing.Tracer("blockvalidation").Start(ctx, "fetchAndStoreSubtree", diff --git a/services/blockvalidation/get_blocks_test.go b/services/blockvalidation/get_blocks_test.go index d2ba976cd7..7de7c7ea5c 100644 --- a/services/blockvalidation/get_blocks_test.go +++ b/services/blockvalidation/get_blocks_test.go @@ -2592,7 +2592,7 @@ func TestBlockWorker(t *testing.T) { // Check result result := <-resultQueue assert.Error(t, result.err, "Should propagate subtree fetch error") - assert.Contains(t, result.err.Error(), "Failed to fetch subtree data for block") + assert.Contains(t, result.err.Error(), "Failed to fetch subtrees for block") }) t.Run("WorkerHandlesEmptyQueue", func(t *testing.T) {