Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"path/filepath"
"slices"
"time"

"github.com/spf13/afero"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,22 +40,29 @@ type Config struct {

// set to false if atxs are not compatible before and after the checkpoint recovery.
PreserveOwnAtx bool `mapstructure:"preserve-own-atx"`

RetryCount int `mapstructure:"retry-count"`
RetryInterval time.Duration `mapstructure:"retry-interval"`
}

func DefaultConfig() Config {
return Config{
PreserveOwnAtx: true,
RetryCount: 5,
RetryInterval: 3 * time.Second,
}
}

type RecoverConfig struct {
GoldenAtx types.ATXID
DataDir string
DbFile string
LocalDbFile string
NodeIDs []types.NodeID // IDs to preserve own ATXs
Uri string
Restore types.LayerID
GoldenAtx types.ATXID
DataDir string
DbFile string
LocalDbFile string
NodeIDs []types.NodeID // IDs to preserve own ATXs
Uri string
Restore types.LayerID
RetryCount int
RetryInterval time.Duration
}

func (c *RecoverConfig) DbPath() string {
Expand All @@ -75,6 +83,8 @@ func copyToLocalFile(
fs afero.Fs,
dataDir, uri string,
restore types.LayerID,
retryCount int,
retryInterval time.Duration,
) (string, error) {
parsed, err := url.Parse(uri)
if err != nil {
Expand All @@ -89,11 +99,23 @@ func copyToLocalFile(
logger.Info("old recovery data backed up", log.ZContext(ctx), zap.String("dir", bdir))
}
dst := RecoveryFilename(dataDir, filepath.Base(parsed.String()), restore)
if err = httpToLocalFile(ctx, parsed, fs, dst); err != nil {
return "", err
for range retryCount + 1 {
err = httpToLocalFile(ctx, parsed, fs, dst)
switch {
case errors.Is(err, ErrTransient):
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(retryInterval):
}
case err != nil:
return "", err
default:
logger.Info("checkpoint data persisted", log.ZContext(ctx), zap.String("file", dst))
return dst, nil
}
}
logger.Info("checkpoint data persisted", log.ZContext(ctx), zap.String("file", dst))
return dst, nil
return "", ErrCheckpointRequestFailed
}

type AtxDep struct {
Expand Down Expand Up @@ -170,7 +192,10 @@ func RecoverWithDb(
return nil, fmt.Errorf("remove old bootstrap data: %w", err)
}
logger.Info("recover from uri", zap.String("uri", cfg.Uri))
cpFile, err := copyToLocalFile(ctx, logger, fs, cfg.DataDir, cfg.Uri, cfg.Restore)
cpFile, err := copyToLocalFile(
ctx, logger, fs, cfg.DataDir, cfg.Uri, cfg.Restore,
cfg.RetryCount, cfg.RetryInterval,
)
if err != nil {
return nil, err
}
Expand Down
56 changes: 42 additions & 14 deletions checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"net/http/httptest"
"path/filepath"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -119,33 +120,57 @@ func verifyDbContent(tb testing.TB, db *sql.Database) {
require.Empty(tb, extra)
}

func checkpointServer(t testing.TB) string {
func checkpointServerWithFaultInjection(t testing.TB, succeed func() bool) string {
mux := http.NewServeMux()
mux.HandleFunc("GET /snapshot-15", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(checkpointData))
if succeed == nil || succeed() {
w.Write([]byte(checkpointData))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("service unavailable"))
}
})
ts := httptest.NewServer(mux)
t.Cleanup(ts.Close)
return ts.URL
}

func checkpointServer(t testing.TB) string {
return checkpointServerWithFaultInjection(t, nil)
}

func TestRecover(t *testing.T) {
t.Parallel()
url := checkpointServer(t)
var fail atomic.Bool
url := checkpointServerWithFaultInjection(t, func() bool {
// 2nd attempt will always succeed
return !fail.Swap(false)
})

tt := []struct {
name string
uri string
expErr error
name string
uri string
expErr error
reqFail bool
}{
{
name: "http",
uri: fmt.Sprintf("%s/snapshot-15", url),
},
{
name: "http+retry",
uri: fmt.Sprintf("%s/snapshot-15", url),
reqFail: true,
},
{
name: "not found",
uri: fmt.Sprintf("%s/snapshot-42", url),
expErr: checkpoint.ErrCheckpointNotFound,
},
{
name: "url unreachable",
uri: "http://nowhere/snapshot-15",
expErr: checkpoint.ErrCheckpointNotFound,
expErr: checkpoint.ErrCheckpointRequestFailed,
},
{
name: "ftp",
Expand All @@ -156,15 +181,18 @@ func TestRecover(t *testing.T) {

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
fail.Store(tc.reqFail)
fs := afero.NewMemMapFs()
cfg := &checkpoint.RecoverConfig{
GoldenAtx: goldenAtx,
DataDir: t.TempDir(),
DbFile: "test.sql",
LocalDbFile: "local.sql",
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: tc.uri,
Restore: types.LayerID(recoverLayer),
GoldenAtx: goldenAtx,
DataDir: t.TempDir(),
DbFile: "test.sql",
LocalDbFile: "local.sql",
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: tc.uri,
Restore: types.LayerID(recoverLayer),
RetryCount: 5,
RetryInterval: 100 * time.Millisecond,
}
bsdir := filepath.Join(cfg.DataDir, bootstrap.DirName)
require.NoError(t, fs.MkdirAll(bsdir, 0o700))
Expand Down
24 changes: 20 additions & 4 deletions checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
)

var (
ErrCheckpointNotFound = errors.New("checkpoint not found")
ErrUrlSchemeNotSupported = errors.New("url scheme not supported")
ErrTransient = errors.New("transient error")
ErrCheckpointNotFound = errors.New("checkpoint not found")
ErrCheckpointRequestFailed = errors.New("checkpoint request failed")
ErrUrlSchemeNotSupported = errors.New("url scheme not supported")
)

type RecoveryFile struct {
Expand Down Expand Up @@ -118,13 +120,27 @@ func httpToLocalFile(ctx context.Context, resource *url.URL, fs afero.Fs, dst st
urlErr := &url.Error{}
switch {
case errors.As(err, &urlErr):
return ErrCheckpointNotFound
// It makes sense to retry on network errors.
return ErrTransient
case err != nil:
// This shouldn't really happen. According to net/http docs for Do:
// "Any returned error will be of type *url.Error."
return fmt.Errorf("http get recovery file: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
switch {
case resp.StatusCode == http.StatusOK:
// Continue
case resp.StatusCode == http.StatusNotFound:
// The checkpoint is not found. This is not considered a fatal error.
return ErrCheckpointNotFound
case resp.StatusCode%100 == 4:
// Can't load the checkoint but it maybe due to an unexpected server problem
// that is not likely to be resolved by retrying.
return ErrCheckpointRequestFailed
default:
// It makes sense to retry on other server errors.
return ErrTransient
}
rf, err := NewRecoveryFile(fs, dst)
if err != nil {
Expand Down
16 changes: 9 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,15 @@ func (app *App) loadCheckpoint(ctx context.Context) (*checkpoint.PreservedData,
}
}
cfg := &checkpoint.RecoverConfig{
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
NodeIDs: nodeIDs,
Uri: app.Config.Recovery.Uri,
Restore: types.LayerID(app.Config.Recovery.Restore),
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
NodeIDs: nodeIDs,
Uri: app.Config.Recovery.Uri,
Restore: types.LayerID(app.Config.Recovery.Restore),
RetryCount: app.Config.Recovery.RetryCount,
RetryInterval: app.Config.Recovery.RetryInterval,
}

return checkpoint.Recover(ctx, app.log.Zap(), afero.NewOsFs(), cfg)
Expand Down