Skip to content

Commit

Permalink
etcdctl: add etcdctl snapshot pipe command
Browse files Browse the repository at this point in the history
  • Loading branch information
Ais8Ooz8 authored Mar 18, 2024
1 parent 11fb917 commit 1a4add0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 58 deletions.
82 changes: 25 additions & 57 deletions client/v3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"os"
"strings"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -37,15 +38,14 @@ func hasChecksum(n int64) bool {
return (n % 512) == sha256.Size
}

// SaveWithVersion fetches snapshot from remote etcd server, saves data
// GetSnapshotWithVersion fetches snapshot from remote etcd server, saves data
// to target path and returns server version. If the context "ctx" is canceled or timed out,
// snapshot save stream will error out (e.g. context.Canceled,
// context.DeadlineExceeded). Make sure to specify only one endpoint
// in client configuration. Snapshot API must be requested to a
// selected node, and saved snapshot is the point-in-time state of
// the selected node.
// Etcd <v3.6 will return "" as version.
func SaveWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) (string, error) {
// the selected GetSnapshotWithVersion. Etcd < v3.6 will return "" as version.
func GetSnapshotWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, f *os.File) (string, error) {
cfg.Logger = lg.Named("client")
if len(cfg.Endpoints) != 1 {
return "", fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
Expand All @@ -56,21 +56,12 @@ func SaveWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, d
}
defer cli.Close()

partpath := dbPath + ".part"
defer os.RemoveAll(partpath)

var f *os.File
f, err = os.OpenFile(partpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fileutil.PrivateFileMode)
if err != nil {
return "", fmt.Errorf("could not open %s (%v)", partpath, err)
}
lg.Info("created temporary db file", zap.String("path", partpath))

start := time.Now()
resp, err := cli.SnapshotWithVersion(ctx)
if err != nil {
return "", err
}

defer resp.Snapshot.Close()
lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0]))
var size int64
Expand All @@ -94,54 +85,31 @@ func SaveWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, d
zap.String("etcd-version", resp.Version),
)

if err = os.Rename(partpath, dbPath); err != nil {
return resp.Version, fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
}
lg.Info("saved", zap.String("path", dbPath))
lg.Info("saved", zap.String("path", "/dev/stdout"))
partPath := f.Name()
dbPath := strings.TrimSuffix(partPath, ".part")
if f != os.Stdout {
if err := os.Rename(partPath, dbPath); err != nil {
return resp.Version, fmt.Errorf("could not rename %s to %s (%v)", partPath, dbPath, err)
}
}
lg.Info("finished", zap.String("path", dbPath))
return resp.Version, nil
}

func PipeWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config) (string, error) {
cfg.Logger = lg.Named("client")
if len(cfg.Endpoints) != 1 {
return "", fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
}
cli, err := clientv3.New(cfg)
func SaveWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) (string, error) {
partPath := dbPath + ".part"
f, err := os.OpenFile(partPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fileutil.PrivateFileMode)
if err != nil {
return "", err
return "", fmt.Errorf("could not open %s (%v)", partPath, err)
}
defer cli.Close()
lg.Info("created temporary db file", zap.String("path", partPath))

start := time.Now()
resp, err := cli.SnapshotWithVersion(ctx)
if err != nil {
return "", err
}
defer resp.Snapshot.Close()
lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0]))
var size int64
f := os.Stdout
size, err = io.Copy(f, resp.Snapshot)
if err != nil {
return resp.Version, err
}
if !hasChecksum(size) {
return resp.Version, fmt.Errorf("sha256 checksum not found [bytes: %d]", size)
}
if err = fileutil.Fsync(f); err != nil {
return resp.Version, err
}
if err = f.Close(); err != nil {
return resp.Version, err
}
lg.Info("fetched snapshot",
zap.String("endpoint", cfg.Endpoints[0]),
zap.String("size", humanize.Bytes(uint64(size))),
zap.Duration("took", time.Since(start)),
zap.String("etcd-version", resp.Version),
)
defer os.RemoveAll(partPath)
defer f.Close()

return GetSnapshotWithVersion(ctx, lg, cfg, f)
}

lg.Info("saved", zap.String("path", "/dev/stdout"))
return resp.Version, nil
func PipeWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config) (string, error) {
return GetSnapshotWithVersion(ctx, lg, cfg, os.Stdout)
}
2 changes: 1 addition & 1 deletion etcdctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ DEFRAG returns a zero exit code only if it succeeded defragmenting all given end

SNAPSHOT provides commands to restore a snapshot of a running etcd server into a fresh cluster.

### SNAPSHOT PIPE \<filename\>
### SNAPSHOT PIPE

SNAPSHOT PIPE writes a point-in-time snapshot of the etcd backend database to stdout.

Expand Down

0 comments on commit 1a4add0

Please sign in to comment.