Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions internal/infrastructure/db/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func (s *service) updateProjectionsAfterOffchainTxEvents(events []domain.Event)
}
log.Debugf("spent %d vtxos", len(spentVtxos))
case offchainTx.IsFinalized():
txid, _, outs, err := s.txDecoder.DecodeTx(offchainTx.ArkTx)
txid, ins, outs, err := s.txDecoder.DecodeTx(offchainTx.ArkTx)
if err != nil {
log.WithError(err).Warn("failed to decode ark tx")
return
Expand All @@ -533,6 +533,23 @@ func (s *service) updateProjectionsAfterOffchainTxEvents(events []domain.Event)
return
}

txSwept := false
// if the tx is expired at finalization step, it may be possible the new outputs should be marked swept
// it depends if the inputs are swept or not
if offchainTx.ExpiryTimestamp > 0 &&
time.Now().After(time.Unix(offchainTx.ExpiryTimestamp, 0)) {
inputVtxos, err := s.vtxoStore.GetVtxos(ctx, ins)
// if an error happened, we assume the vtxo is swept. it should never happen but it's to avoid skipping adding vtxo to db
txSwept = err != nil
Comment on lines +541 to +543
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not infer txSwept=true from repository errors.

At Line 543, txSwept = err != nil turns transient DB read failures into permanent swept flags for all new outputs. That can incorrectly lock spendable vtxos.

Proposed fix
 		if offchainTx.ExpiryTimestamp > 0 &&
 			time.Now().After(time.Unix(offchainTx.ExpiryTimestamp, 0)) {
 			inputVtxos, err := s.vtxoStore.GetVtxos(ctx, ins)
-			// if an error happened, we assume the vtxo is swept. it should never happen but it's to avoid skipping adding vtxo to db
-			txSwept = err != nil
-
-			for _, inputVtxo := range inputVtxos {
-				if inputVtxo.Swept {
-					txSwept = true
-					break
-				}
-			}
+			if err != nil {
+				log.WithError(err).Warn(
+					"failed to infer swept state from input vtxos; falling back to script-based swept check",
+				)
+			} else {
+				for _, inputVtxo := range inputVtxos {
+					if inputVtxo.Swept {
+						txSwept = true
+						break
+					}
+				}
+			}
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/infrastructure/db/service.go` around lines 541 - 543, The code
currently sets txSwept = err != nil after calling s.vtxoStore.GetVtxos, which
incorrectly treats DB read errors as permanent swept state; remove the
assignment and instead handle GetVtxos errors explicitly (e.g., return the error
up, log and skip processing, or retry) so transient read failures don't mark
vtxos swept; determine txSwept only from the actual inputVtxos result (or
explicit spent flags) after a successful call to s.vtxoStore.GetVtxos rather
than based on err.


for _, inputVtxo := range inputVtxos {
if inputVtxo.Swept {
txSwept = true
break
}
}
}

// once the offchain tx is finalized, the user signed the checkpoint txs
// thus, we can create the new vtxos in the db.
newVtxos := make([]domain.Vtxo, 0, len(outs))
Expand All @@ -548,7 +565,10 @@ func (s *service) updateProjectionsAfterOffchainTxEvents(events []domain.Event)
continue
}

isDust := script.IsSubDustScript(out.PkScript)
outputSwept := txSwept
if !outputSwept {
outputSwept = script.IsSubDustScript(out.PkScript)
}

newVtxos = append(newVtxos, domain.Vtxo{
Outpoint: domain.Outpoint{
Expand All @@ -565,7 +585,7 @@ func (s *service) updateProjectionsAfterOffchainTxEvents(events []domain.Event)
// mark the vtxo as "swept" if it is below dust limit to prevent it from being spent again in a future offchain tx
// the only way to spend a swept vtxo is by collecting enough dust to cover the minSettlementVtxoAmount and then settle.
// because sub-dust vtxos are using OP_RETURN output script, they can't be unilaterally exited.
Swept: isDust,
Swept: outputSwept,
Assets: assets[uint32(outIndex)],
})
}
Expand Down
298 changes: 298 additions & 0 deletions internal/test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,8 @@ func TestOffchainTx(t *testing.T) {
)
})

// In this test, Alice submits a tx and then fetches the pending tx by proviving an intent
// to finalize it.
t.Run("finalize pending tx", func(t *testing.T) {
ctx := t.Context()
explorer, err := mempool_explorer.NewExplorer(
Expand Down Expand Up @@ -1022,6 +1024,302 @@ func TestOffchainTx(t *testing.T) {
require.Equal(t, txid, incomingFunds[0].Txid)
})

// In these tests, Alice submits an offchain tx and waits for the inputs to be swept before
// finalizing it.
// Covers both cases, the one where the inputs are swept by the server, and the other one,
// where they are expired but not yet swept.
// In both cases, the server should allow the finalization but the new vtxos should be marked
// as swept.
t.Run("finalize pending swept tx", func(t *testing.T) {
t.Run("vtxo already swept", func(t *testing.T) {
ctx := t.Context()
explorer, err := mempool_explorer.NewExplorer(
"http://localhost:3000", arklib.BitcoinRegTest,
mempool_explorer.WithTracker(false),
)
require.NoError(t, err)

alice, aliceWallet, _, arkSvc := setupArkSDKwithPublicKey(t)
t.Cleanup(func() { alice.Stop() })
t.Cleanup(func() { arkSvc.Close() })

vtxo := faucetOffchain(t, alice, 0.00021)

finalizedPendingTxs, err := alice.FinalizePendingTxs(ctx, nil)
require.NoError(t, err)
require.Empty(t, finalizedPendingTxs)

_, offchainAddresses, _, _, err := aliceWallet.GetAddresses(ctx)
require.NoError(t, err)
require.NotEmpty(t, offchainAddresses)
offchainAddress := offchainAddresses[0]

serverParams, err := arkSvc.GetInfo(ctx)
require.NoError(t, err)

vtxoScript, err := script.ParseVtxoScript(offchainAddress.Tapscripts)
require.NoError(t, err)
forfeitClosures := vtxoScript.ForfeitClosures()
require.Len(t, forfeitClosures, 1)
closure := forfeitClosures[0]

scriptBytes, err := closure.Script()
require.NoError(t, err)

_, vtxoTapTree, err := vtxoScript.TapTree()
require.NoError(t, err)

merkleProof, err := vtxoTapTree.GetTaprootMerkleProof(
txscript.NewBaseTapLeaf(scriptBytes).TapHash(),
)
require.NoError(t, err)

ctrlBlock, err := txscript.ParseControlBlock(merkleProof.ControlBlock)
require.NoError(t, err)

tapscript := &waddrmgr.Tapscript{
ControlBlock: ctrlBlock,
RevealedScript: merkleProof.Script,
}

checkpointTapscript, err := hex.DecodeString(serverParams.CheckpointTapscript)
require.NoError(t, err)

vtxoHash, err := chainhash.NewHashFromStr(vtxo.Txid)
require.NoError(t, err)

addr, err := arklib.DecodeAddressV0(offchainAddress.Address)
require.NoError(t, err)
pkscript, err := addr.GetPkScript()
require.NoError(t, err)

ptx, checkpointsPtx, err := offchain.BuildTxs(
[]offchain.VtxoInput{
{
Outpoint: &wire.OutPoint{
Hash: *vtxoHash,
Index: vtxo.VOut,
},
Tapscript: tapscript,
Amount: int64(vtxo.Amount),
RevealedTapscripts: offchainAddress.Tapscripts,
},
},
[]*wire.TxOut{
{
Value: int64(vtxo.Amount),
PkScript: pkscript,
},
},
checkpointTapscript,
)
require.NoError(t, err)

encodedCheckpoints := make([]string, 0, len(checkpointsPtx))
for _, checkpoint := range checkpointsPtx {
encoded, err := checkpoint.B64Encode()
require.NoError(t, err)
encodedCheckpoints = append(encodedCheckpoints, encoded)
}

// sign the ark transaction
encodedArkTx, err := ptx.B64Encode()
require.NoError(t, err)
signedArkTx, err := aliceWallet.SignTransaction(
ctx,
explorer,
encodedArkTx,
)
require.NoError(t, err)

txid, _, _, err := arkSvc.SubmitTx(ctx, signedArkTx, encodedCheckpoints)
require.NoError(t, err)
require.NotEmpty(t, txid)

// Make the vtxo expire
err = generateBlocks(21)
require.NoError(t, err)

// Givetime to the server to sweep the vtxo
time.Sleep(30 * time.Second)

// Ensure the vtxo is pending and swept
scriptStr := hex.EncodeToString(pkscript)
resp, err := alice.Indexer().GetVtxos(
ctx, indexer.WithScripts([]string{scriptStr}), indexer.WithPendingOnly(),
)
require.NoError(t, err)
require.NotNil(t, resp)
require.NotEmpty(t, resp.Vtxos)
require.True(t, resp.Vtxos[0].Spent)
require.True(t, resp.Vtxos[0].Swept)

var incomingFunds []types.Vtxo
var incomingErr error
wg := &sync.WaitGroup{}
wg.Go(func() {
incomingFunds, incomingErr = alice.NotifyIncomingFunds(ctx, offchainAddress.Address)
})

// Finalize the pending tx and ensure the new vtxo is marked as swept
finalizedTxIds, err := alice.FinalizePendingTxs(ctx, nil)
require.NoError(t, err)
require.NotEmpty(t, finalizedTxIds)
require.Equal(t, 1, len(finalizedTxIds))
require.Equal(t, txid, finalizedTxIds[0])

wg.Wait()
require.NoError(t, incomingErr)
require.NotEmpty(t, incomingFunds)
require.Len(t, incomingFunds, 1)
require.Equal(t, txid, incomingFunds[0].Txid)
require.True(t, incomingFunds[0].Swept)
})

t.Run("vtxo expired but not swept", func(t *testing.T) {
ctx := t.Context()
explorer, err := mempool_explorer.NewExplorer(
"http://localhost:3000", arklib.BitcoinRegTest,
mempool_explorer.WithTracker(false),
)
require.NoError(t, err)

alice, aliceWallet, _, arkSvc := setupArkSDKwithPublicKey(t)
t.Cleanup(func() { alice.Stop() })
t.Cleanup(func() { arkSvc.Close() })

vtxo := faucetOffchain(t, alice, 0.00021)

finalizedPendingTxs, err := alice.FinalizePendingTxs(ctx, nil)
require.NoError(t, err)
require.Empty(t, finalizedPendingTxs)

_, offchainAddresses, _, _, err := aliceWallet.GetAddresses(ctx)
require.NoError(t, err)
require.NotEmpty(t, offchainAddresses)
offchainAddress := offchainAddresses[0]

serverParams, err := arkSvc.GetInfo(ctx)
require.NoError(t, err)

vtxoScript, err := script.ParseVtxoScript(offchainAddress.Tapscripts)
require.NoError(t, err)
forfeitClosures := vtxoScript.ForfeitClosures()
require.Len(t, forfeitClosures, 1)
closure := forfeitClosures[0]

scriptBytes, err := closure.Script()
require.NoError(t, err)

_, vtxoTapTree, err := vtxoScript.TapTree()
require.NoError(t, err)

merkleProof, err := vtxoTapTree.GetTaprootMerkleProof(
txscript.NewBaseTapLeaf(scriptBytes).TapHash(),
)
require.NoError(t, err)

ctrlBlock, err := txscript.ParseControlBlock(merkleProof.ControlBlock)
require.NoError(t, err)

tapscript := &waddrmgr.Tapscript{
ControlBlock: ctrlBlock,
RevealedScript: merkleProof.Script,
}

checkpointTapscript, err := hex.DecodeString(serverParams.CheckpointTapscript)
require.NoError(t, err)

vtxoHash, err := chainhash.NewHashFromStr(vtxo.Txid)
require.NoError(t, err)

addr, err := arklib.DecodeAddressV0(offchainAddress.Address)
require.NoError(t, err)
pkscript, err := addr.GetPkScript()
require.NoError(t, err)

ptx, checkpointsPtx, err := offchain.BuildTxs(
[]offchain.VtxoInput{
{
Outpoint: &wire.OutPoint{
Hash: *vtxoHash,
Index: vtxo.VOut,
},
Tapscript: tapscript,
Amount: int64(vtxo.Amount),
RevealedTapscripts: offchainAddress.Tapscripts,
},
},
[]*wire.TxOut{
{
Value: int64(vtxo.Amount),
PkScript: pkscript,
},
},
checkpointTapscript,
)
require.NoError(t, err)

encodedCheckpoints := make([]string, 0, len(checkpointsPtx))
for _, checkpoint := range checkpointsPtx {
encoded, err := checkpoint.B64Encode()
require.NoError(t, err)
encodedCheckpoints = append(encodedCheckpoints, encoded)
}

// sign the ark transaction
encodedArkTx, err := ptx.B64Encode()
require.NoError(t, err)
signedArkTx, err := aliceWallet.SignTransaction(
ctx,
explorer,
encodedArkTx,
)
require.NoError(t, err)

txid, _, _, err := arkSvc.SubmitTx(ctx, signedArkTx, encodedCheckpoints)
require.NoError(t, err)
require.NotEmpty(t, txid)

// Make the vtxo expire
err = generateBlocks(21)
require.NoError(t, err)

// Don't give time to the server to mark the vtxo as swept
// Ensure the vtxo is pending but not swept yet
scriptStr := hex.EncodeToString(pkscript)
resp, err := alice.Indexer().GetVtxos(
ctx, indexer.WithScripts([]string{scriptStr}), indexer.WithPendingOnly(),
)
require.NoError(t, err)
require.NotNil(t, resp)
require.NotEmpty(t, resp.Vtxos)
require.True(t, resp.Vtxos[0].Spent)
require.False(t, resp.Vtxos[0].Swept)

var incomingFunds []types.Vtxo
var incomingErr error
wg := &sync.WaitGroup{}
wg.Go(func() {
incomingFunds, incomingErr = alice.NotifyIncomingFunds(ctx, offchainAddress.Address)
})

// Finalize the pending tx and ensure the new vtxo is still marked as swept
finalizedTxIds, err := alice.FinalizePendingTxs(ctx, nil)
require.NoError(t, err)
require.NotEmpty(t, finalizedTxIds)
require.Equal(t, 1, len(finalizedTxIds))
require.Equal(t, txid, finalizedTxIds[0])

wg.Wait()
require.NoError(t, incomingErr)
require.NotEmpty(t, incomingFunds)
require.Len(t, incomingFunds, 1)
require.Equal(t, txid, incomingFunds[0].Txid)
require.True(t, incomingFunds[0].Swept)
})
})

// In this test, we ensure that a tx with too many OP_RETURN outputs gets rejected.
// The server is configured with a max of 3 OP_RETURN outputs, so submitting 4 should fail.
t.Run("too many op return outputs", func(t *testing.T) {
Expand Down
Loading