Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
525 changes: 360 additions & 165 deletions services/blockassembly/subtreeprocessor/SubtreeProcessor.go

Large diffs are not rendered by default.

40 changes: 26 additions & 14 deletions services/blockassembly/subtreeprocessor/SubtreeProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3410,7 +3410,7 @@ func TestRemoveCoinbaseUtxosChildrenRemoval(t *testing.T) {
// the removal of child transactions when processing coinbase UTXOs through
// the removeCoinbaseUtxos function integration.
func TestMoveBackBlockChildrenRemoval(t *testing.T) {
t.Run("moveBackBlockCreateNewSubtrees_integration_with_child_removal", func(t *testing.T) {
t.Run("moveBackBlock_integration_with_child_removal", func(t *testing.T) {
ctx := context.Background()

// Setup test environment
Expand All @@ -3434,7 +3434,10 @@ func TestMoveBackBlockChildrenRemoval(t *testing.T) {
}()
defer close(newSubtreeChan)

stp, err := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, settings, blobStore, nil, utxoStore, newSubtreeChan)
blockchainClient := &blockchain.Mock{}
blockchainClient.On("SetBlockProcessedAt", mock.Anything, mock.Anything, mock.Anything).Return(nil)

stp, err := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, settings, blobStore, blockchainClient, utxoStore, newSubtreeChan)
require.NoError(t, err)

// Use existing coinbase transaction from test data
Expand All @@ -3444,7 +3447,7 @@ func TestMoveBackBlockChildrenRemoval(t *testing.T) {
_, err = utxoStore.Create(ctx, coinbase, 1)
require.NoError(t, err)

// Create block with empty subtrees (so moveBackBlockCreateNewSubtrees only calls removeCoinbaseUtxos)
// Create block with empty subtrees (so moveBackBlock only calls removeCoinbaseUtxos)
block := &model.Block{
CoinbaseTx: coinbase,
Header: &model.BlockHeader{
Expand All @@ -3458,9 +3461,9 @@ func TestMoveBackBlockChildrenRemoval(t *testing.T) {
Subtrees: []*chainhash.Hash{}, // Empty to focus on removeCoinbaseUtxos call
}

// Call moveBackBlockCreateNewSubtrees directly
_, _, err = stp.moveBackBlockCreateNewSubtrees(ctx, block, true)
require.NoError(t, err, "moveBackBlockCreateNewSubtrees should succeed")
// Call moveBackBlock directly
_, _, err = stp.moveBackBlock(ctx, block, true)
require.NoError(t, err, "moveBackBlock should succeed")
})
}

Expand Down Expand Up @@ -3800,17 +3803,26 @@ func TestRemoveCoinbaseUtxos_MissingTransaction(t *testing.T) {
})
}

func TestMoveBackBlockCreateNewSubtrees_ErrorRecovery(t *testing.T) {
t.Run("moveBackBlockCreateNewSubtrees handles partial processing failures", func(t *testing.T) {
func TestMoveBackBlock_ErrorRecovery(t *testing.T) {
t.Run("moveBackBlock handles partial processing failures", func(t *testing.T) {
newSubtreeChan := make(chan NewSubtreeRequest)
settings := test.CreateBaseTestSettings(t)

// Create a memory blob store for testing
blobStore := blob_memory.New()

stp, _ := NewSubtreeProcessor(t.Context(), ulogger.TestLogger{}, settings, blobStore, nil, nil, newSubtreeChan)
blockchainClient := &blockchain.Mock{}
blockchainClient.On("SetBlockProcessedAt", mock.Anything, mock.Anything, mock.Anything).Return(nil)

ctx := t.Context()
utxoStoreURL, err := url.Parse("sqlitememory:///test")
require.NoError(t, err)
utxoStore, err := sql.New(ctx, ulogger.TestLogger{}, settings, utxoStoreURL)
require.NoError(t, err)

stp, _ := NewSubtreeProcessor(ctx, ulogger.TestLogger{}, settings, blobStore, blockchainClient, utxoStore, newSubtreeChan)
stp.InitCurrentBlockHeader(prevBlockHeader)
stp.Start(t.Context())
stp.Start(ctx)

// Create a block with subtrees but corrupted data
subtreeHash := chainhash.HashH([]byte("subtree1"))
Expand All @@ -3829,19 +3841,19 @@ func TestMoveBackBlockCreateNewSubtrees_ErrorRecovery(t *testing.T) {

// Store corrupted subtree data in blob store
corruptedData := []byte("corrupted_subtree_data")
err := stp.subtreeStore.Set(context.Background(), subtreeHash[:], fileformat.FileTypeSubtree, corruptedData)
err = stp.subtreeStore.Set(context.Background(), subtreeHash[:], fileformat.FileTypeSubtree, corruptedData)
require.NoError(t, err)

// Capture state before operation
originalState := captureSubtreeProcessorState(stp)

// Call moveBackBlockCreateNewSubtrees
_, _, err = stp.moveBackBlockCreateNewSubtrees(context.Background(), block, true)
// Call moveBackBlock
_, _, err = stp.moveBackBlock(context.Background(), block, true)

// Should handle corrupted data gracefully or return appropriate error
if err != nil {
// If error occurred, verify state remains unchanged
assertStateUnchanged(t, stp, originalState, "moveBackBlockCreateNewSubtrees with corrupted data")
assertStateUnchanged(t, stp, originalState, "moveBackBlock with corrupted data")
}
})
}
Expand Down
37 changes: 0 additions & 37 deletions services/blockassembly/subtreeprocessor/hash_slice_pool.go

This file was deleted.

108 changes: 93 additions & 15 deletions services/blockassembly/subtreeprocessor/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,55 +89,133 @@ func (s *SplitSwissMap) Iter(f func(hash chainhash.Hash, v struct{}) bool) {
}
}

type txInpointsBucket struct {
mu sync.Mutex
m *swiss.Map[chainhash.Hash, *subtreepkg.TxInpoints]
}

type SplitTxInpointsMap struct {
m map[uint16]*txmap.SyncedMap[chainhash.Hash, *subtreepkg.TxInpoints]
buckets []txInpointsBucket
nrOfBuckets uint16
}

func NewSplitTxInpointsMap(nrOfBuckets uint16) *SplitTxInpointsMap {
m := make(map[uint16]*txmap.SyncedMap[chainhash.Hash, *subtreepkg.TxInpoints], nrOfBuckets)
buckets := make([]txInpointsBucket, nrOfBuckets)
for i := uint16(0); i < nrOfBuckets; i++ {
m[i] = txmap.NewSyncedMap[chainhash.Hash, *subtreepkg.TxInpoints]()
buckets[i].m = swiss.NewMap[chainhash.Hash, *subtreepkg.TxInpoints](64)
}

return &SplitTxInpointsMap{
m: m,
buckets: buckets,
nrOfBuckets: nrOfBuckets,
}
}

func (s *SplitTxInpointsMap) Delete(hash chainhash.Hash) bool {
return s.m[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)].Delete(hash)
b := &s.buckets[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)]
b.mu.Lock()
ok := b.m.Has(hash)
if ok {
b.m.Delete(hash)
}
b.mu.Unlock()
return ok
}

func (s *SplitTxInpointsMap) Exists(hash chainhash.Hash) bool {
return s.m[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)].Exists(hash)
b := &s.buckets[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)]
b.mu.Lock()
ok := b.m.Has(hash)
b.mu.Unlock()
return ok
}

func (s *SplitTxInpointsMap) Get(hash chainhash.Hash) (*subtreepkg.TxInpoints, bool) {
return s.m[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)].Get(hash)
b := &s.buckets[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)]
b.mu.Lock()
v, ok := b.m.Get(hash)
b.mu.Unlock()
return v, ok
}

func (s *SplitTxInpointsMap) Length() int {
length := 0

for _, syncedMap := range s.m {
length += syncedMap.Length()
for i := uint16(0); i < s.nrOfBuckets; i++ {
b := &s.buckets[i]
b.mu.Lock()
length += b.m.Count()
b.mu.Unlock()
}

return length
}

func (s *SplitTxInpointsMap) Set(hash chainhash.Hash, inpoints *subtreepkg.TxInpoints) {
s.m[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)].Set(hash, inpoints)
b := &s.buckets[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)]
b.mu.Lock()
b.m.Put(hash, inpoints)
b.mu.Unlock()
}

func (s *SplitTxInpointsMap) SetIfNotExists(hash chainhash.Hash, inpoints *subtreepkg.TxInpoints) (*subtreepkg.TxInpoints, bool) {
return s.m[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)].SetIfNotExists(hash, inpoints)
b := &s.buckets[txmap.Bytes2Uint16Buckets(hash, s.nrOfBuckets)]
b.mu.Lock()
if existing, ok := b.m.Get(hash); ok {
b.mu.Unlock()
return existing, false
}
b.m.Put(hash, inpoints)
b.mu.Unlock()
return inpoints, true
}

func (s *SplitTxInpointsMap) Clear() {
for _, syncedMap := range s.m {
syncedMap.Clear()
for i := uint16(0); i < s.nrOfBuckets; i++ {
b := &s.buckets[i]
b.mu.Lock()
b.m = swiss.NewMap[chainhash.Hash, *subtreepkg.TxInpoints](64)
b.mu.Unlock()
}
}

// ParallelBulkSetIfNotExists inserts multiple entries in parallel, grouped by bucket.
// Each bucket is processed by a separate goroutine with a single lock acquisition.
// wasSet[i] is set to true if hashes[i] was newly inserted (not already present).
func (s *SplitTxInpointsMap) ParallelBulkSetIfNotExists(
hashes []chainhash.Hash,
inpoints []*subtreepkg.TxInpoints,
wasSet []bool,
) {
n := len(hashes)
if n == 0 {
return
}

// Phase 1: Group indices by bucket (O(N), no locks)
bucketIndices := make([][]int, s.nrOfBuckets)
for i := 0; i < n; i++ {
bucket := txmap.Bytes2Uint16Buckets(hashes[i], s.nrOfBuckets)
bucketIndices[bucket] = append(bucketIndices[bucket], i)
}

// Phase 2: Process each non-empty bucket in parallel (one lock per bucket)
var wg sync.WaitGroup
for bIdx := uint16(0); bIdx < s.nrOfBuckets; bIdx++ {
indices := bucketIndices[bIdx]
if len(indices) == 0 {
continue
}
wg.Add(1)
go func(b *txInpointsBucket, indices []int) {
defer wg.Done()
b.mu.Lock()
for _, idx := range indices {
if !b.m.Has(hashes[idx]) {
b.m.Put(hashes[idx], inpoints[idx])
wasSet[idx] = true
}
}
b.mu.Unlock()
}(&s.buckets[bIdx], indices)
}
wg.Wait()
}
Loading
Loading