diff --git a/bchain/coins/avalanche/evm.go b/bchain/coins/avalanche/evm.go index c5ad36b98c..7593593ce1 100644 --- a/bchain/coins/avalanche/evm.go +++ b/bchain/coins/avalanche/evm.go @@ -93,9 +93,12 @@ func (c *AvalancheRPCClient) EthSubscribe(ctx context.Context, channel interface func (c *AvalancheRPCClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { err := c.Client.CallContext(ctx, result, method, args...) // unfinalized data cannot be queried error returned when trying to query a block height greater than last finalized block - // do not throw rpc error and instead treat as ErrBlockNotFound + // treat as ErrBlockNotFound so sync retries instead of processing an empty result // https://docs.avax.network/quickstart/exchanges/integrate-exchange-with-avalanche#determining-finality - if err != nil && !strings.Contains(err.Error(), "cannot query unfinalized data") { + if err != nil { + if strings.Contains(err.Error(), "cannot query unfinalized data") { + return bchain.ErrBlockNotFound + } return err } return nil diff --git a/bchain/coins/avalanche/evm_test.go b/bchain/coins/avalanche/evm_test.go new file mode 100644 index 0000000000..d589d79040 --- /dev/null +++ b/bchain/coins/avalanche/evm_test.go @@ -0,0 +1,73 @@ +package avalanche + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/ethereum/go-ethereum/rpc" + "github.com/trezor/blockbook/bchain" +) + +type testAvalancheRPCService struct{} + +func (s *testAvalancheRPCService) Unfinalized() (string, error) { + return "", errors.New("cannot query unfinalized data") +} + +func (s *testAvalancheRPCService) OtherError() (string, error) { + return "", errors.New("other failure") +} + +func (s *testAvalancheRPCService) Success() (string, error) { + return "ok", nil +} + +func newTestAvalancheRPCClient(t *testing.T) *AvalancheRPCClient { + t.Helper() + + server := rpc.NewServer() + if err := server.RegisterName("test", &testAvalancheRPCService{}); err != nil { + t.Fatalf("RegisterName() error = %v", err) + } + client := rpc.DialInProc(server) + t.Cleanup(func() { + client.Close() + server.Stop() + }) + + return &AvalancheRPCClient{Client: client} +} + +func TestAvalancheRPCClientCallContextMapsUnfinalizedDataToBlockNotFound(t *testing.T) { + client := newTestAvalancheRPCClient(t) + + var result string + err := client.CallContext(context.Background(), &result, "test_unfinalized") + if !errors.Is(err, bchain.ErrBlockNotFound) { + t.Fatalf("CallContext() error = %v, want ErrBlockNotFound", err) + } +} + +func TestAvalancheRPCClientCallContextReturnsOtherErrors(t *testing.T) { + client := newTestAvalancheRPCClient(t) + + var result string + err := client.CallContext(context.Background(), &result, "test_otherError") + if err == nil || !strings.Contains(err.Error(), "other failure") { + t.Fatalf("CallContext() error = %v, want other failure", err) + } +} + +func TestAvalancheRPCClientCallContextReturnsResult(t *testing.T) { + client := newTestAvalancheRPCClient(t) + + var result string + if err := client.CallContext(context.Background(), &result, "test_success"); err != nil { + t.Fatalf("CallContext() error = %v", err) + } + if result != "ok" { + t.Fatalf("result = %q, want %q", result, "ok") + } +} diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index e5232b0de6..d313f6e720 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "encoding/json" + stdErrors "errors" "fmt" "io" "math/big" @@ -901,7 +902,7 @@ func (b *EthereumRPC) GetBlockHash(height uint32) (string, error) { defer cancel() h, err := b.Client.HeaderByNumber(ctx, &n) if err != nil { - if err == ethereum.NotFound { + if err == ethereum.NotFound || stdErrors.Is(err, bchain.ErrBlockNotFound) { return "", bchain.ErrBlockNotFound } return "", errors.Annotatef(err, "height %v", height) diff --git a/bchain/coins/eth/ethrpc_blockhash_test.go b/bchain/coins/eth/ethrpc_blockhash_test.go new file mode 100644 index 0000000000..9eede81634 --- /dev/null +++ b/bchain/coins/eth/ethrpc_blockhash_test.go @@ -0,0 +1,92 @@ +package eth + +import ( + "context" + stdErrors "errors" + "math/big" + "testing" + "time" + + ethereum "github.com/ethereum/go-ethereum" + "github.com/trezor/blockbook/bchain" +) + +type stubEVMHeader struct { + hash string + num *big.Int +} + +func (h *stubEVMHeader) Hash() string { return h.hash } +func (h *stubEVMHeader) Number() *big.Int { return h.num } +func (h *stubEVMHeader) Difficulty() *big.Int { return big.NewInt(0) } + +type stubEVMClient struct { + header bchain.EVMHeader + err error +} + +func (s *stubEVMClient) NetworkID(ctx context.Context) (*big.Int, error) { return nil, nil } +func (s *stubEVMClient) HeaderByNumber(ctx context.Context, number *big.Int) (bchain.EVMHeader, error) { + if s.err != nil { + return nil, s.err + } + return s.header, nil +} +func (s *stubEVMClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { return nil, nil } +func (s *stubEVMClient) EstimateGas(ctx context.Context, msg interface{}) (uint64, error) { + return 0, nil +} +func (s *stubEVMClient) BalanceAt(ctx context.Context, addrDesc bchain.AddressDescriptor, blockNumber *big.Int) (*big.Int, error) { + return nil, nil +} +func (s *stubEVMClient) NonceAt(ctx context.Context, addrDesc bchain.AddressDescriptor, blockNumber *big.Int) (uint64, error) { + return 0, nil +} + +func newTestEthereumRPC(client bchain.EVMClient) *EthereumRPC { + return &EthereumRPC{Client: client, Timeout: time.Second} +} + +func TestGetBlockHashMapsErrBlockNotFound(t *testing.T) { + // AvalancheClient.HeaderByNumber returns bchain.ErrBlockNotFound directly + // after the unfinalized-data fix. GetBlockHash must recognize that and + // surface ErrBlockNotFound to sync.go's stdErrors.Is check; otherwise + // fork detection breaks because juju/errors v0.0.0-2017 has no Unwrap. + b := newTestEthereumRPC(&stubEVMClient{err: bchain.ErrBlockNotFound}) + _, err := b.GetBlockHash(123) + if !stdErrors.Is(err, bchain.ErrBlockNotFound) { + t.Fatalf("GetBlockHash() error = %v, want ErrBlockNotFound", err) + } +} + +func TestGetBlockHashMapsEthereumNotFound(t *testing.T) { + b := newTestEthereumRPC(&stubEVMClient{err: ethereum.NotFound}) + _, err := b.GetBlockHash(123) + if !stdErrors.Is(err, bchain.ErrBlockNotFound) { + t.Fatalf("GetBlockHash() error = %v, want ErrBlockNotFound", err) + } +} + +func TestGetBlockHashPropagatesOtherErrors(t *testing.T) { + other := stdErrors.New("rpc connection refused") + b := newTestEthereumRPC(&stubEVMClient{err: other}) + _, err := b.GetBlockHash(123) + if err == nil { + t.Fatal("GetBlockHash() error = nil, want propagated error") + } + if stdErrors.Is(err, bchain.ErrBlockNotFound) { + t.Fatalf("GetBlockHash() error = %v, must not match ErrBlockNotFound", err) + } +} + +func TestGetBlockHashReturnsHash(t *testing.T) { + header := &stubEVMHeader{hash: "0xdeadbeef", num: big.NewInt(123)} + b := newTestEthereumRPC(&stubEVMClient{header: header}) + got, err := b.GetBlockHash(123) + if err != nil { + t.Fatalf("GetBlockHash() error = %v", err) + } + if got != "0xdeadbeef" { + t.Fatalf("GetBlockHash() = %q, want %q", got, "0xdeadbeef") + } +} diff --git a/blockbook.go b/blockbook.go index 31bb2f477e..395efdbde0 100644 --- a/blockbook.go +++ b/blockbook.go @@ -547,19 +547,30 @@ func syncIndexLoop() { defer close(chanSyncIndexDone) glog.Info("syncIndexLoop starting") // resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second + logSyncErr := func(err error, suffix string) { + // Transient backend conditions (timeouts, missing-block-at-tip, connection blips) + // are expected during sync and the loop already retries from them, + // so demote the log level to keep operator alerting (set on level=ERROR) signal-rich. + msg := errors.ErrorStack(err) + suffix + if db.IsTransientSyncError(err) { + glog.Warning("syncIndexLoop ", msg) + } else { + glog.Error("syncIndexLoop ", msg) + } + } common.TickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, time.Duration(*resyncIndexDebounceMs)*time.Millisecond, chanSyncIndex, func() { if err := syncWorker.ResyncIndex(onNewBlock, false); err != nil { if err == db.ErrOperationInterrupted || common.IsInShutdown() { return } - glog.Error("syncIndexLoop ", errors.ErrorStack(err), ", will retry...") + logSyncErr(err, ", will retry...") // retry once in case of random network error, after a slight delay time.Sleep(time.Millisecond * 2500) if err := syncWorker.ResyncIndex(onNewBlock, false); err != nil { if err == db.ErrOperationInterrupted || common.IsInShutdown() { return } - glog.Error("syncIndexLoop ", errors.ErrorStack(err)) + logSyncErr(err, "") } } }) diff --git a/db/sync.go b/db/sync.go index bfb43028ee..0293a7617b 100644 --- a/db/sync.go +++ b/db/sync.go @@ -138,19 +138,25 @@ func (w *SyncWorker) ResyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b case nil: d := time.Since(start) glog.Info("resync: finished in ", d) - w.metrics.IndexResyncDuration.Observe(float64(d) / 1e6) // in milliseconds - w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk())) + if w.metrics != nil { + w.metrics.IndexResyncDuration.Observe(float64(d) / 1e6) // in milliseconds + w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk())) + } bh, _, err := w.db.GetBestBlock() if err == nil { w.is.FinishedSync(bh) } - w.metrics.BackendBestHeight.Set(float64(w.is.BackendInfo.Blocks)) - w.metrics.BlockbookBestHeight.Set(float64(bh)) + if w.metrics != nil { + w.metrics.BackendBestHeight.Set(float64(w.is.BackendInfo.Blocks)) + w.metrics.BlockbookBestHeight.Set(float64(bh)) + } return err case errSynced: // this is not actually error but flag that resync wasn't necessary w.is.FinishedSyncNoChange() - w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk())) + if w.metrics != nil { + w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk())) + } if initialSync { d := time.Since(start) glog.Info("resync: finished in ", d) @@ -158,7 +164,9 @@ func (w *SyncWorker) ResyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b return nil } - w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() + if w.metrics != nil { + w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() + } return err } @@ -208,7 +216,11 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b return err } if remoteBestHeight < w.startHeight { - glog.Error("resync: error - remote best height ", remoteBestHeight, " less than sync start height ", w.startHeight) + // Transient race: between reading remoteBestHash and remoteBestHeight + // the backend can briefly report a tip a few blocks behind what we + // already indexed (load-balanced RPC pools, in-flight reorgs). The + // outer sync loop retries from this on its own. + glog.Warning("resync: remote best height ", remoteBestHeight, " less than sync start height ", w.startHeight) return errors.New("resync: remote best height error") } if initialSync { @@ -305,7 +317,9 @@ func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc, initialSync if onNewBlock != nil { onNewBlock(res.block) } - w.metrics.BlockbookBestHeight.Set(float64(res.block.Height)) + if w.metrics != nil { + w.metrics.BlockbookBestHeight.Set(float64(res.block.Height)) + } if res.block.Height > 0 && res.block.Height%1000 == 0 { glog.Info("connected block ", res.block.Height, " ", res.block.Hash) } @@ -373,6 +387,22 @@ func (w *SyncWorker) shouldRestartSyncOnMissingBlock(height uint32, expectedHash return currentHash != expectedHash, nil } +func (w *SyncWorker) shouldRestartSyncOnMissingBlockHash(height uint32) (bool, error) { + bestHeight, err := w.chain.GetBestBlockHeight() + if err != nil { + return false, err + } + return bestHeight < height, nil +} + +// IsTransientSyncError reports whether err originates from a transient +// backend condition (timeouts, connection blips, missing-block-at-tip, etc.) +// that the sync loop will retry from. Callers can use it to log such errors +// at warning level instead of error. +func IsTransientSyncError(err error) bool { + return isRetryableGetBlockError(err) +} + func isRetryableGetBlockError(err error) bool { if err == nil { return false @@ -410,7 +440,8 @@ func isRetryableGetBlockError(err error) bool { strings.Contains(msg, "503 service unavailable"), strings.Contains(msg, "504 gateway timeout"), strings.Contains(msg, "header not found"), - strings.Contains(msg, "block not found"): + strings.Contains(msg, "block not found"), + strings.Contains(msg, "remote best height error"): return true default: return false @@ -423,6 +454,83 @@ func isRetryableGetBlockError(err error) bool { return cause != nil && isRetryable(cause) } +// getBlockHashForSync returns ok=false and err=nil when the caller should retry +// the same height after a delay. +func (w *SyncWorker) getBlockHashForSync(height uint32, retries *int) (string, bool, error) { + hash, err := w.chain.GetBlockHash(height) + if err == nil { + *retries = 0 + return hash, true, nil + } + if isRetryableGetBlockError(err) { + glog.Warning("GetBlockHash error ", err) + } else { + glog.Error("GetBlockHash error ", err) + } + if w.metrics != nil { + w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() + } + if !isRetryableGetBlockError(err) { + return "", false, err + } + *retries++ + threshold := w.missingBlockRetry.RecheckThreshold + if threshold < 1 { + threshold = 1 + } + if *retries < threshold { + return "", false, nil + } + *retries = 0 + restart, checkErr := w.shouldRestartSyncOnMissingBlockHash(height) + if checkErr != nil { + glog.Error("GetBlockHash missing block check error ", checkErr) + return "", false, nil + } + if restart { + glog.Warning("sync: block hash at height ", height, " no longer on chain, restarting sync") + return "", false, errResync + } + return "", false, nil +} + +func recordBlockWorkerAbort(errp *error, abortErr error, context string) { + if stdErrors.Is(abortErr, errResync) || IsTransientSyncError(abortErr) { + glog.Warning("sync: ", context, " aborted, restarting sync: ", abortErr) + } else { + glog.Error("sync: ", context, " aborted, worker error ", abortErr) + } + if *errp == nil { + *errp = abortErr + } +} + +func (w *SyncWorker) waitForBlockWorkers(wg *sync.WaitGroup, abortCh <-chan error, closeTerminating func(), errp *error, context string) { + workersDone := make(chan struct{}) + go func() { + wg.Wait() + close(workersDone) + }() + + shutdownCh := w.chanOsSignal + for { + select { + case <-workersDone: + return + case abortErr := <-abortCh: + recordBlockWorkerAbort(errp, abortErr, context) + closeTerminating() + case <-shutdownCh: + shutdownCh = nil + if *errp == nil { + glog.Info(context, " interrupted while waiting for workers") + *errp = ErrOperationInterrupted + } + closeTerminating() + } + } +} + // ParallelConnectBlocks uses parallel goroutines to get data from blockchain daemon but keeps Blockbook in func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, lower, higher uint32, syncWorkers uint32) error { var err error @@ -440,6 +548,12 @@ func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, low // Keep it buffered so the first worker can report without blocking while the // coordinator is closing channels/terminating. abortCh := make(chan error, 1) + var terminateOnce sync.Once + closeTerminating := func() { + terminateOnce.Do(func() { + close(terminating) + }) + } writeBlockWorker := func() { defer close(writeBlockDone) lastBlock := lower - 1 @@ -462,7 +576,9 @@ func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, low if onNewBlock != nil { onNewBlock(b) } - w.metrics.BlockbookBestHeight.Set(float64(b.Height)) + if w.metrics != nil { + w.metrics.BlockbookBestHeight.Set(float64(b.Height)) + } if b.Height > 0 && b.Height%1000 == 0 { glog.Info("connected block ", b.Height, " ", b.Hash) @@ -484,49 +600,60 @@ func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, low } go writeBlockWorker() var hash string + hashRetries := 0 ConnectLoop: for h := lower; h <= higher; { select { case abortErr := <-abortCh: - if stdErrors.Is(abortErr, errResync) { - glog.Warning("sync: parallel connect aborted, restarting sync") - } else { - glog.Error("sync: parallel connect aborted, worker error ", abortErr) - } - err = abortErr - close(terminating) + recordBlockWorkerAbort(&err, abortErr, "parallel connect") + closeTerminating() break ConnectLoop case <-w.chanOsSignal: glog.Info("connectBlocksParallel interrupted at height ", h) err = ErrOperationInterrupted // signal all workers to terminate their loops (error loops are interrupted below) - close(terminating) + closeTerminating() break ConnectLoop default: - hash, err = w.chain.GetBlockHash(h) - if err != nil { - glog.Error("GetBlockHash error ", err) - w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() + var gotHash bool + var hashErr error + hash, gotHash, hashErr = w.getBlockHashForSync(h, &hashRetries) + if hashErr != nil { + err = hashErr + closeTerminating() + break ConnectLoop + } + if !gotHash { time.Sleep(time.Millisecond * 500) continue } - hch <- hashHeight{hash, h} - h++ + select { + case hch <- hashHeight{hash, h}: + h++ + case abortErr := <-abortCh: + recordBlockWorkerAbort(&err, abortErr, "parallel connect") + closeTerminating() + break ConnectLoop + case <-w.chanOsSignal: + glog.Info("connectBlocksParallel interrupted at height ", h) + err = ErrOperationInterrupted + closeTerminating() + break ConnectLoop + } } } close(hch) // signal stop to workers that are in a error loop hchClosed.Store(true) - // wait for workers and close bch that will stop writer loop - wg.Wait() + // wait for workers while still observing abortCh; otherwise a late worker + // error can leave the writer waiting for a block that will never arrive. + w.waitForBlockWorkers(&wg, abortCh, closeTerminating, &err, "parallel connect") // Hardening: a worker can report a terminal tail error after ConnectLoop has // already ended (for example once hchClosed=true). Drain once so we return // that error instead of silently succeeding. select { case abortErr := <-abortCh: - if err == nil { - err = abortErr - } + recordBlockWorkerAbort(&err, abortErr, "parallel connect") default: } for i := 0; i < int(syncWorkers); i++ { @@ -559,7 +686,7 @@ GetBlockLoop: if err != nil { if isRetryableGetBlockError(err) { notFoundRetries++ - glog.Error("getBlockWorker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err, ". Retrying...") + glog.Warning("getBlockWorker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err, ". Retrying...") threshold := cfg.RecheckThreshold // Once the hash queue is closed we are at the tail of the range; use // a smaller threshold to avoid stalling on a missing tip block. @@ -581,21 +708,16 @@ GetBlockLoop: } } } else { - // When the hash queue is closed, stop retrying non-retryable errors. - if hchClosed.Load() == true { - glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...") - // Hardening: without surfacing this tail failure, the worker could - // exit and leave the sync loop stuck until manual restart. - select { - case abortCh <- err: - default: - } - return + glog.Error("getBlockWorker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err, ". Exiting...") + select { + case abortCh <- err: + default: } - notFoundRetries = 0 - glog.Error("getBlockWorker ", i, " connect block error ", err, ". Retrying...") + return + } + if w.metrics != nil { + w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() } - w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() select { case <-terminating: return @@ -636,6 +758,12 @@ func (w *SyncWorker) BulkConnectBlocks(lower, higher uint32) error { // Keep it buffered so the first worker can report without blocking while the // coordinator is closing channels/terminating. abortCh := make(chan error, 1) + var terminateOnce sync.Once + closeTerminating := func() { + terminateOnce.Do(func() { + close(terminating) + }) + } writeBlockWorker := func() { defer close(writeBlockDone) bc, err := w.db.InitBulkConnect() @@ -676,38 +804,51 @@ func (w *SyncWorker) BulkConnectBlocks(lower, higher uint32) error { } go writeBlockWorker() var hash string + hashRetries := 0 start := time.Now() msTime := time.Now().Add(1 * time.Minute) ConnectLoop: for h := lower; h <= higher; { select { case abortErr := <-abortCh: - if stdErrors.Is(abortErr, errResync) { - // Another worker observed a missing block that no longer matches the chain. - glog.Warning("sync: bulk connect aborted, restarting sync") - } else { - glog.Error("sync: bulk connect aborted, worker error ", abortErr) - } - err = abortErr - close(terminating) + recordBlockWorkerAbort(&err, abortErr, "bulk connect") + closeTerminating() break ConnectLoop case <-w.chanOsSignal: glog.Info("connectBlocksParallel interrupted at height ", h) err = ErrOperationInterrupted // signal all workers to terminate their loops (error loops are interrupted below) - close(terminating) + closeTerminating() break ConnectLoop default: - hash, err = w.chain.GetBlockHash(h) - if err != nil { - glog.Error("GetBlockHash error ", err) - w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() + var gotHash bool + var hashErr error + hash, gotHash, hashErr = w.getBlockHashForSync(h, &hashRetries) + if hashErr != nil { + err = hashErr + closeTerminating() + break ConnectLoop + } + if !gotHash { time.Sleep(time.Millisecond * 500) continue } - hch <- hashHeight{hash, h} + select { + case hch <- hashHeight{hash, h}: + case abortErr := <-abortCh: + recordBlockWorkerAbort(&err, abortErr, "bulk connect") + closeTerminating() + break ConnectLoop + case <-w.chanOsSignal: + glog.Info("connectBlocksParallel interrupted at height ", h) + err = ErrOperationInterrupted + closeTerminating() + break ConnectLoop + } if h > 0 && h%1000 == 0 { - w.metrics.BlockbookBestHeight.Set(float64(h)) + if w.metrics != nil { + w.metrics.BlockbookBestHeight.Set(float64(h)) + } glog.Info("connecting block ", h, " ", hash, ", elapsed ", time.Since(start), " ", w.db.GetAndResetConnectBlockStats()) start = time.Now() } @@ -715,7 +856,9 @@ ConnectLoop: if glog.V(1) { glog.Info(w.db.GetMemoryStats()) } - w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk())) + if w.metrics != nil { + w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk())) + } msTime = time.Now().Add(10 * time.Minute) } h++ @@ -724,15 +867,14 @@ ConnectLoop: close(hch) // signal stop to workers that are in a error loop hchClosed.Store(true) - // wait for workers and close bch that will stop writer loop - wg.Wait() + // wait for workers while still observing abortCh; otherwise a late worker + // error can leave the writer waiting for a block that will never arrive. + w.waitForBlockWorkers(&wg, abortCh, closeTerminating, &err, "bulk connect") // Hardening: capture a late worker error reported after the connect loop // exits so the caller can retry instead of treating sync as successful. select { case abortErr := <-abortCh: - if err == nil { - err = abortErr - } + recordBlockWorkerAbort(&err, abortErr, "bulk connect") default: } for i := 0; i < w.syncWorkers; i++ { diff --git a/db/sync_test.go b/db/sync_test.go index 5c115f806b..535e6cb3fd 100644 --- a/db/sync_test.go +++ b/db/sync_test.go @@ -8,8 +8,12 @@ import ( "io" "net" "net/url" + "os" + "sync" + "sync/atomic" "syscall" "testing" + "time" jujuErrors "github.com/juju/errors" "github.com/trezor/blockbook/bchain" @@ -119,3 +123,436 @@ func TestIsRetryableGetBlockError(t *testing.T) { }) } } + +// TestIsTransientSyncErrorClassifiesAvaxAnnotatedNotFound covers the public +// classifier used by syncIndexLoop to demote retryable errors to warnings. +// The annotated ErrBlockNotFound shape comes from the Avalanche unfinalized +// data path: AvalancheRPCClient.CallContext returns ErrBlockNotFound, which +// getBlockRaw then wraps with `errors.Annotatef(err, "hash %v, height %v", ...)`. +func TestIsTransientSyncErrorClassifiesAvaxAnnotatedNotFound(t *testing.T) { + annotated := jujuErrors.Annotatef(bchain.ErrBlockNotFound, "hash %v, height %v", "", uint32(84420029)) + if !IsTransientSyncError(annotated) { + t.Fatalf("IsTransientSyncError(%v) = false, want true", annotated) + } + + if !IsTransientSyncError(stdErrors.New("resync: remote best height error")) { + t.Fatal("synthetic 'remote best height error' must be classified as transient") + } + + if IsTransientSyncError(nil) { + t.Fatal("nil must not be classified as transient") + } +} + +type missingHashChain struct { + bchain.BlockChain + bestHeight uint32 +} + +func (c *missingHashChain) GetBlockHash(height uint32) (string, error) { + return "", bchain.ErrBlockNotFound +} + +func (c *missingHashChain) GetBestBlockHeight() (uint32, error) { + return c.bestHeight, nil +} + +func TestGetBlockHashForSyncReturnsResyncWhenRequestedHeightPastTip(t *testing.T) { + w := &SyncWorker{ + chain: &missingHashChain{bestHeight: 9}, + missingBlockRetry: MissingBlockRetryConfig{RecheckThreshold: 2}, + } + retries := 0 + + hash, ok, err := w.getBlockHashForSync(10, &retries) + if hash != "" || ok || err != nil { + t.Fatalf("first getBlockHashForSync() = %q, %v, %v; want retry", hash, ok, err) + } + if retries != 1 { + t.Fatalf("retries = %d, want 1", retries) + } + + hash, ok, err = w.getBlockHashForSync(10, &retries) + if hash != "" || ok || !stdErrors.Is(err, errResync) { + t.Fatalf("second getBlockHashForSync() = %q, %v, %v; want errResync", hash, ok, err) + } +} + +type nonRetryableGetBlockChain struct { + bchain.BlockChain +} + +func (c *nonRetryableGetBlockChain) GetBlock(hash string, height uint32) (*bchain.Block, error) { + return nil, stdErrors.New("bad block") +} + +func TestGetBlockWorkerAbortsOnNonRetryableError(t *testing.T) { + hch := make(chan hashHeight, 1) + hch <- hashHeight{hash: "hash", height: 1} + close(hch) + + bch := []chan *bchain.Block{make(chan *bchain.Block)} + hchClosed := atomic.Value{} + hchClosed.Store(false) + terminating := make(chan struct{}) + abortCh := make(chan error, 1) + w := &SyncWorker{chain: &nonRetryableGetBlockChain{}} + + var wg sync.WaitGroup + wg.Add(1) + go w.getBlockWorker(0, 1, &wg, hch, bch, &hchClosed, terminating, abortCh) + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("getBlockWorker did not exit after non-retryable error") + } + + select { + case err := <-abortCh: + if err == nil || err.Error() != "bad block" { + t.Fatalf("abort error = %v, want bad block", err) + } + default: + t.Fatal("expected abort error") + } +} + +func TestWaitForBlockWorkersTerminatesOnAbort(t *testing.T) { + var wg sync.WaitGroup + terminating := make(chan struct{}) + abortCh := make(chan error, 1) + workerReleased := make(chan struct{}) + + wg.Add(1) + go func() { + defer wg.Done() + abortCh <- stdErrors.New("tail block failed") + <-terminating + close(workerReleased) + }() + + var closeOnce sync.Once + var err error + w := &SyncWorker{} + w.waitForBlockWorkers(&wg, abortCh, func() { + closeOnce.Do(func() { + close(terminating) + }) + }, &err, "test connect") + + if err == nil || err.Error() != "tail block failed" { + t.Fatalf("waitForBlockWorkers() error = %v, want tail block failed", err) + } + select { + case <-workerReleased: + case <-time.After(time.Second): + t.Fatal("worker was not released after abort") + } +} + +// TestWaitForBlockWorkersUnsticksBlockedWorker is the regression test for the +// coordinator-deadlock fix: one worker reports a fatal error via abortCh while +// another worker is parked on a bch send with no consumer. Without the +// abort-aware wait, wg.Wait() would block forever because the second worker +// can only escape by selecting on terminating. +func TestWaitForBlockWorkersUnsticksBlockedWorker(t *testing.T) { + var wg sync.WaitGroup + terminating := make(chan struct{}) + abortCh := make(chan error, 1) + bch := make(chan *bchain.Block) // unbuffered, no consumer + + wg.Add(1) + go func() { + defer wg.Done() + abortCh <- stdErrors.New("worker fatal") + }() + + wg.Add(1) + workerReleased := make(chan struct{}) + go func() { + defer wg.Done() + select { + case bch <- &bchain.Block{}: + case <-terminating: + close(workerReleased) + } + }() + + var closeOnce sync.Once + var err error + w := &SyncWorker{} + + done := make(chan struct{}) + go func() { + w.waitForBlockWorkers(&wg, abortCh, func() { + closeOnce.Do(func() { + close(terminating) + }) + }, &err, "test") + close(done) + }() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("waitForBlockWorkers did not return; coordinator deadlocked") + } + + select { + case <-workerReleased: + default: + t.Fatal("blocked worker was not released by closeTerminating") + } + + if err == nil || err.Error() != "worker fatal" { + t.Fatalf("err = %v, want worker fatal", err) + } +} + +func TestWaitForBlockWorkersHandlesOsSignal(t *testing.T) { + var wg sync.WaitGroup + terminating := make(chan struct{}) + abortCh := make(chan error, 1) + osSignal := make(chan os.Signal, 1) + + wg.Add(1) + go func() { + defer wg.Done() + <-terminating + }() + + var closeOnce sync.Once + var err error + w := &SyncWorker{chanOsSignal: osSignal} + + done := make(chan struct{}) + go func() { + w.waitForBlockWorkers(&wg, abortCh, func() { + closeOnce.Do(func() { + close(terminating) + }) + }, &err, "test") + close(done) + }() + + osSignal <- syscall.SIGTERM + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("waitForBlockWorkers did not return on OS signal") + } + + if !stdErrors.Is(err, ErrOperationInterrupted) { + t.Fatalf("err = %v, want ErrOperationInterrupted", err) + } +} + +func TestRecordBlockWorkerAbortKeepsFirstError(t *testing.T) { + var err error + first := stdErrors.New("first") + recordBlockWorkerAbort(&err, first, "test") + if err == nil || err.Error() != "first" { + t.Fatalf("err = %v, want first", err) + } + recordBlockWorkerAbort(&err, stdErrors.New("second"), "test") + if err.Error() != "first" { + t.Fatalf("err = %v, want first preserved", err) + } + + // errResync routes through the warning branch but should still be captured + // when no prior error exists. + var resyncErr error + recordBlockWorkerAbort(&resyncErr, errResync, "test") + if !stdErrors.Is(resyncErr, errResync) { + t.Fatalf("resyncErr = %v, want errResync", resyncErr) + } +} + +type reorgChain struct { + bchain.BlockChain + bestHeight uint32 + currentHash string + hashErr error +} + +func (c *reorgChain) GetBlock(hash string, height uint32) (*bchain.Block, error) { + return nil, bchain.ErrBlockNotFound +} + +func (c *reorgChain) GetBestBlockHeight() (uint32, error) { + return c.bestHeight, nil +} + +func (c *reorgChain) GetBlockHash(height uint32) (string, error) { + return c.currentHash, c.hashErr +} + +func TestShouldRestartSyncOnMissingBlock(t *testing.T) { + tests := []struct { + name string + chain *reorgChain + height uint32 + expected string + wantRestart bool + wantErr bool + }{ + { + name: "tip below requested height", + chain: &reorgChain{bestHeight: 5}, + height: 10, + expected: "expected", + wantRestart: true, + }, + { + name: "block hash gone", + chain: &reorgChain{bestHeight: 20, hashErr: bchain.ErrBlockNotFound}, + height: 10, + expected: "expected", + wantRestart: true, + }, + { + name: "different hash at height", + chain: &reorgChain{bestHeight: 20, currentHash: "different"}, + height: 10, + expected: "expected", + wantRestart: true, + }, + { + name: "matching hash", + chain: &reorgChain{bestHeight: 20, currentHash: "expected"}, + height: 10, + expected: "expected", + wantRestart: false, + }, + { + name: "GetBlockHash error propagates", + chain: &reorgChain{bestHeight: 20, hashErr: stdErrors.New("rpc fail")}, + height: 10, + expected: "expected", + wantErr: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + w := &SyncWorker{chain: tc.chain} + restart, err := w.shouldRestartSyncOnMissingBlock(tc.height, tc.expected) + if tc.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if restart != tc.wantRestart { + t.Errorf("restart = %v, want %v", restart, tc.wantRestart) + } + }) + } +} + +// TestGetBlockWorkerSignalsResyncOnReorg verifies that once the per-block +// retry threshold is exceeded and the chain confirms a reorg, the worker +// aborts the parallel sync via errResync rather than spinning forever. +func TestGetBlockWorkerSignalsResyncOnReorg(t *testing.T) { + hch := make(chan hashHeight, 1) + hch <- hashHeight{hash: "expected", height: 10} + close(hch) + + bch := []chan *bchain.Block{make(chan *bchain.Block)} + hchClosed := atomic.Value{} + hchClosed.Store(true) // tail-of-range path uses TipRecheckThreshold + terminating := make(chan struct{}) + abortCh := make(chan error, 1) + + w := &SyncWorker{ + chain: &reorgChain{bestHeight: 5}, + missingBlockRetry: MissingBlockRetryConfig{ + RecheckThreshold: 5, + TipRecheckThreshold: 1, + RetryDelay: time.Millisecond, + }, + } + + var wg sync.WaitGroup + wg.Add(1) + go w.getBlockWorker(0, 1, &wg, hch, bch, &hchClosed, terminating, abortCh) + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("getBlockWorker did not exit after reorg detection") + } + + select { + case err := <-abortCh: + if !stdErrors.Is(err, errResync) { + t.Fatalf("abort err = %v, want errResync", err) + } + default: + t.Fatal("expected abortCh to receive errResync") + } +} + +// TestGetBlockWorkerExitsOnTerminating verifies that a worker stuck in its +// retry-delay sleep wakes up and exits promptly when the coordinator closes +// the terminating channel. +func TestGetBlockWorkerExitsOnTerminating(t *testing.T) { + hch := make(chan hashHeight, 1) + hch <- hashHeight{hash: "h", height: 1} + // keep hch open so the worker stays in the retry loop + + bch := []chan *bchain.Block{make(chan *bchain.Block)} + hchClosed := atomic.Value{} + hchClosed.Store(false) + terminating := make(chan struct{}) + abortCh := make(chan error, 1) + + w := &SyncWorker{ + chain: &blockingSendChain{}, + } + + var wg sync.WaitGroup + wg.Add(1) + go w.getBlockWorker(0, 1, &wg, hch, bch, &hchClosed, terminating, abortCh) + + // Give the worker a moment to enter the bch send. + time.Sleep(20 * time.Millisecond) + close(terminating) + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("getBlockWorker did not exit on terminating close") + } +} + +type blockingSendChain struct { + bchain.BlockChain +} + +func (c *blockingSendChain) GetBlock(hash string, height uint32) (*bchain.Block, error) { + return &bchain.Block{BlockHeader: bchain.BlockHeader{Hash: hash, Height: height}}, nil +}