Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions pkg/utxocheck/existence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package utxocheck

import (
"context"

"github.com/bsv-blockchain/go-bt/v2/chainhash"
subtreepkg "github.com/bsv-blockchain/go-subtree"
"github.com/bsv-blockchain/teranode/stores/utxo"
"github.com/bsv-blockchain/teranode/stores/utxo/fields"
)

// CheckAllTxsExistInUTXO batch-checks whether all transaction hashes exist in the UTXO store.
// It processes hashes in batches, skipping coinbase placeholders.
// Returns false immediately when any tx is missing or still in Creating state.
// Returns true only when ALL non-coinbase txs exist and are fully created.
// On UTXO store error, returns (false, err) so callers can fall back to subtreeData.
func CheckAllTxsExistInUTXO(ctx context.Context, utxoStore utxo.Store, txHashes []chainhash.Hash, batchSize int) (bool, error) {
if len(txHashes) == 0 {
return true, nil
}

if batchSize <= 0 {
batchSize = 1000
}

for i := 0; i < len(txHashes); i += batchSize {
end := i + batchSize
if end > len(txHashes) {
end = len(txHashes)
}

batch := make([]*utxo.UnresolvedMetaData, 0, end-i)
for j := i; j < end; j++ {
if txHashes[j].Equal(subtreepkg.CoinbasePlaceholderHashValue) {
continue
}
batch = append(batch, &utxo.UnresolvedMetaData{
Hash: txHashes[j],
Idx: j,
})
}

if len(batch) == 0 {
continue
}

if err := utxoStore.BatchDecorate(ctx, batch, fields.Creating); err != nil {
return false, err
}

for _, item := range batch {
if item.Err != nil || item.Data == nil || item.Data.Creating {
return false, nil
}
}
}

return true, nil
}
164 changes: 164 additions & 0 deletions pkg/utxocheck/existence_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package utxocheck

import (
"context"
"fmt"
"testing"

"github.com/bsv-blockchain/go-bt/v2/chainhash"
subtreepkg "github.com/bsv-blockchain/go-subtree"
"github.com/bsv-blockchain/teranode/stores/utxo"
"github.com/bsv-blockchain/teranode/stores/utxo/meta"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestCheckAllTxsExistInUTXO(t *testing.T) {
ctx := context.Background()

makeHash := func(i int) chainhash.Hash {
var h chainhash.Hash
copy(h[:], fmt.Sprintf("tx%032d", i))
return h
}

t.Run("empty hashes returns true", func(t *testing.T) {
mockStore := &utxo.MockUtxostore{}
result, err := CheckAllTxsExistInUTXO(ctx, mockStore, nil, 1000)
require.NoError(t, err)
assert.True(t, result)
})

t.Run("all txs exist returns true", func(t *testing.T) {
mockStore := &utxo.MockUtxostore{}
hashes := []chainhash.Hash{makeHash(1), makeHash(2), makeHash(3)}

mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
batch := args.Get(1).([]*utxo.UnresolvedMetaData)
for _, item := range batch {
item.Data = &meta.Data{Creating: false}
}
}).
Return(nil)

result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000)
require.NoError(t, err)
assert.True(t, result)
})

t.Run("missing tx returns false", func(t *testing.T) {
mockStore := &utxo.MockUtxostore{}
hashes := []chainhash.Hash{makeHash(1), makeHash(2), makeHash(3)}

mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
batch := args.Get(1).([]*utxo.UnresolvedMetaData)
batch[0].Data = &meta.Data{Creating: false}
// batch[1] left nil — missing tx
}).
Return(nil)

result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000)
require.NoError(t, err)
assert.False(t, result)
})

t.Run("creating tx returns false", func(t *testing.T) {
mockStore := &utxo.MockUtxostore{}
hashes := []chainhash.Hash{makeHash(1)}

mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
batch := args.Get(1).([]*utxo.UnresolvedMetaData)
batch[0].Data = &meta.Data{Creating: true}
}).
Return(nil)

result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000)
require.NoError(t, err)
assert.False(t, result)
})

t.Run("BatchDecorate error returns false with error", func(t *testing.T) {
mockStore := &utxo.MockUtxostore{}
hashes := []chainhash.Hash{makeHash(1)}

mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything).
Return(fmt.Errorf("store error"))

result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000)
require.Error(t, err)
assert.False(t, result)
})

t.Run("coinbase placeholder is skipped", func(t *testing.T) {
mockStore := &utxo.MockUtxostore{}
hashes := []chainhash.Hash{subtreepkg.CoinbasePlaceholderHashValue, makeHash(1)}

mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
batch := args.Get(1).([]*utxo.UnresolvedMetaData)
require.Len(t, batch, 1, "coinbase should be filtered out")
batch[0].Data = &meta.Data{Creating: false}
}).
Return(nil)

result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000)
require.NoError(t, err)
assert.True(t, result)
})

t.Run("batching works correctly", func(t *testing.T) {
mockStore := &utxo.MockUtxostore{}
hashes := make([]chainhash.Hash, 5)
for i := range hashes {
hashes[i] = makeHash(i)
}

callCount := 0
mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
batch := args.Get(1).([]*utxo.UnresolvedMetaData)
for _, item := range batch {
item.Data = &meta.Data{Creating: false}
}
callCount++
}).
Return(nil)

result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 2)
require.NoError(t, err)
assert.True(t, result)
assert.Equal(t, 3, callCount, "5 items with batch size 2 should create 3 batches")
})

t.Run("item error returns false", func(t *testing.T) {
mockStore := &utxo.MockUtxostore{}
hashes := []chainhash.Hash{makeHash(1)}

mockStore.On("BatchDecorate", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
batch := args.Get(1).([]*utxo.UnresolvedMetaData)
batch[0].Err = fmt.Errorf("not found")
}).
Return(nil)

result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000)
require.NoError(t, err)
assert.False(t, result)
})

t.Run("only coinbase hashes returns true without calling store", func(t *testing.T) {
mockStore := &utxo.MockUtxostore{}
hashes := []chainhash.Hash{subtreepkg.CoinbasePlaceholderHashValue}

result, err := CheckAllTxsExistInUTXO(ctx, mockStore, hashes, 1000)
require.NoError(t, err)
assert.True(t, result)
// BatchDecorate should not be called since only coinbase hashes
mockStore.AssertNotCalled(t, "BatchDecorate")
})

}
107 changes: 105 additions & 2 deletions services/blockvalidation/get_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/bsv-blockchain/teranode/errors"
"github.com/bsv-blockchain/teranode/model"
"github.com/bsv-blockchain/teranode/pkg/fileformat"
"github.com/bsv-blockchain/teranode/pkg/utxocheck"
"github.com/bsv-blockchain/teranode/stores/blob/options"
"github.com/bsv-blockchain/teranode/util"
"github.com/bsv-blockchain/teranode/util/tracing"
Expand Down Expand Up @@ -206,8 +207,8 @@ func (u *Server) blockWorker(ctx context.Context, workerID int, workQueue <-chan
return nil
}

// Fetch subtree data for this block
err := u.fetchSubtreeDataForBlock(ctx, work.block, peerID, baseURL)
// Fetch subtree data for this block, adaptively skipping subtreeData when txs exist locally
err := u.fetchBlockSubtreesAdaptive(ctx, work.block, peerID, baseURL)
if err != nil {
// Send result (even if error occurred)
result := resultItem{
Expand Down Expand Up @@ -374,6 +375,108 @@ func (u *Server) fetchSubtreeDataForBlock(gCtx context.Context, block *model.Blo
return nil
}

// fetchBlockSubtreesAdaptive processes subtrees with per-subtree adaptive logic.
// For each subtree it first fetches just the subtree hashes and checks UTXO existence.
// If all txs exist locally, the subtreeData download is skipped entirely.
// Once any subtree has missing txs, all remaining subtrees go straight to full fetch.
func (u *Server) fetchBlockSubtreesAdaptive(gCtx context.Context, block *model.Block, peerID, baseURL string) error {
ctx, _, deferFn := tracing.Tracer("blockvalidation").Start(gCtx, "fetchBlockSubtreesAdaptive",
tracing.WithParentStat(u.stats),
tracing.WithLogMessage(u.logger, "[catchup:fetchBlockSubtreesAdaptive][%s] adaptive subtree fetch for block with %d subtrees", block.Hash().String(), len(block.Subtrees)),
)
defer deferFn()

if len(block.Subtrees) == 0 {
u.logger.Debugf("[catchup:fetchBlockSubtreesAdaptive] Block %s has no subtrees, skipping", block.Hash().String())
return nil
}

g, ctx := errgroup.WithContext(ctx)
subtreeConcurrency := 8
if u.settings.BlockValidation.SubtreeFetchConcurrency > 0 {
subtreeConcurrency = u.settings.BlockValidation.SubtreeFetchConcurrency
}
g.SetLimit(subtreeConcurrency)

// Get peer assignments for subtrees if parallel fetching is enabled
var peerAssignments []*PeerForSubtreeFetch
if u.settings.BlockValidation.CatchupParallelFetchEnabled && u.p2pClient != nil {
var err error
peerAssignments, err = DistributeSubtreesAcrossPeers(ctx, u.logger, u.p2pClient, peerID, baseURL, len(block.Subtrees))
if err != nil {
u.logger.Warnf("[catchup:fetchBlockSubtreesAdaptive][%s] Failed to distribute subtrees across peers: %v, using single peer", block.Hash().String(), err)
peerAssignments = nil
}
}

// Shared flag: once set, all subsequent subtrees skip UTXO probing
var needsSubtreeData atomic.Bool
// If no UTXO store is available, skip the optimization entirely
if u.utxoStore == nil {
needsSubtreeData.Store(true)
}

for i, subtreeHash := range block.Subtrees {
subtreeHashCopy := *subtreeHash
subtreeIndex := i

fetchPeerID := peerID
fetchBaseURL := baseURL
if peerAssignments != nil && subtreeIndex < len(peerAssignments) {
assignment := peerAssignments[subtreeIndex]
fetchPeerID = assignment.PeerID
fetchBaseURL = assignment.BaseURL
}

capturedPeerID := fetchPeerID
capturedBaseURL := fetchBaseURL

g.Go(func() error {
// If flag is already set, go straight to full fetch
if needsSubtreeData.Load() {
Copy link
Contributor

@github-actions github-actions bot Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The atomic flag pattern here has a race condition. When multiple goroutines check needsSubtreeData.Load() at line 436 before any have called .Store(true), they all proceed to the UTXO check. If one goroutine finds a missing tx and sets the flag, other goroutines that already passed the check will still download subtreeData even though the flag is now set.

Impact: The optimization won't be as effective as intended - some goroutines will unnecessarily download subtreeData even after the flag is set.

Solution: This is an inherent limitation of the optimization pattern and may be acceptable since it still provides bandwidth savings. However, if you want stricter behavior, consider checking the flag again before calling fetchAndStoreSubtreeData at lines 458 and 464.


Update:Fixed - The code now includes a re-check of the flag at line 449 after fetching the subtree, which closes the race window. This ensures goroutines that are in-flight will check the flag again before proceeding to download subtreeData.

return u.fetchAndStoreSubtreeAndSubtreeData(ctx, block, &subtreeHashCopy, capturedPeerID, capturedBaseURL)
}

// Try subtree-only: fetch just the hashes and check UTXO existence
subtree, err := u.fetchAndStoreSubtree(ctx, block, &subtreeHashCopy, capturedPeerID, capturedBaseURL)
if err != nil {
// Can't get subtree hashes, fall back to full fetch
needsSubtreeData.Store(true)
return u.fetchAndStoreSubtreeAndSubtreeData(ctx, block, &subtreeHashCopy, capturedPeerID, capturedBaseURL)
}

// Extract tx hashes from subtree nodes
txHashes := make([]chainhash.Hash, len(subtree.Nodes))
for j, node := range subtree.Nodes {
txHashes[j] = node.Hash
}

allExist, err := utxocheck.CheckAllTxsExistInUTXO(ctx, u.utxoStore, txHashes, 1000)
if err != nil {
u.logger.Warnf("[catchup:fetchBlockSubtreesAdaptive][%s] UTXO existence check failed for subtree %s: %v, falling back to subtreeData", block.Hash().String(), subtreeHashCopy.String(), err)
needsSubtreeData.Store(true)
return u.fetchAndStoreSubtreeData(ctx, block, &subtreeHashCopy, subtree, capturedPeerID, capturedBaseURL)
}

if !allExist {
u.logger.Infof("[catchup:fetchBlockSubtreesAdaptive][%s] Missing txs found in subtree %s, switching to subtreeData mode", block.Hash().String(), subtreeHashCopy.String())
needsSubtreeData.Store(true)
return u.fetchAndStoreSubtreeData(ctx, block, &subtreeHashCopy, subtree, capturedPeerID, capturedBaseURL)
}

// All txs exist locally - no subtreeData needed for this subtree
u.logger.Infof("[catchup:fetchBlockSubtreesAdaptive][%s] All %d txs exist in UTXO store, skipping subtreeData download for subtree %s", block.Hash().String(), len(txHashes), subtreeHashCopy.String())
return nil
})
}

if err := g.Wait(); err != nil {
return errors.NewServiceError("[catchup:fetchBlockSubtreesAdaptive] Failed to fetch subtrees for block %s", block.Hash().String(), err)
}

return nil
}

// fetchAndStoreSubtree fetches and stores only the subtree (for subtreeToCheck)
func (u *Server) fetchAndStoreSubtree(ctx context.Context, block *model.Block, subtreeHash *chainhash.Hash, peerID, baseURL string) (*subtreepkg.Subtree, error) {
ctx, _, deferFn := tracing.Tracer("blockvalidation").Start(ctx, "fetchAndStoreSubtree",
Expand Down
2 changes: 1 addition & 1 deletion services/blockvalidation/get_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2592,7 +2592,7 @@ func TestBlockWorker(t *testing.T) {
// Check result
result := <-resultQueue
assert.Error(t, result.err, "Should propagate subtree fetch error")
assert.Contains(t, result.err.Error(), "Failed to fetch subtree data for block")
assert.Contains(t, result.err.Error(), "Failed to fetch subtrees for block")
})

t.Run("WorkerHandlesEmptyQueue", func(t *testing.T) {
Expand Down
Loading
Loading