Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdctl: add etcdctl snapshot pipe command #16243

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-3.6.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
### etcdctl v3

- Add command to generate [shell completion](https://github.com/etcd-io/etcd/pull/13133).
- Add command [snapshot pipe](https://github.com/etcd-io/etcd/pull/16243).
- When print endpoint status, [show db size in use](https://github.com/etcd-io/etcd/pull/13639)
- [Always print the raft_term in decimal](https://github.com/etcd-io/etcd/pull/13711) when displaying member list in json.
- [Add one more field `storageVersion`](https://github.com/etcd-io/etcd/pull/13773) into the response of command `etcdctl endpoint status`.
Expand Down
47 changes: 30 additions & 17 deletions client/v3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"os"
"strings"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -37,15 +38,14 @@ func hasChecksum(n int64) bool {
return (n % 512) == sha256.Size
}

// SaveWithVersion fetches snapshot from remote etcd server, saves data
// WriteSnapshotWithVersion fetches snapshot from remote etcd server, saves data
// to target path and returns server version. If the context "ctx" is canceled or timed out,
// snapshot save stream will error out (e.g. context.Canceled,
// context.DeadlineExceeded). Make sure to specify only one endpoint
// in client configuration. Snapshot API must be requested to a
// selected node, and saved snapshot is the point-in-time state of
// the selected node.
// Etcd <v3.6 will return "" as version.
func SaveWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) (string, error) {
// the selected node. Nota bene: etcd < v3.6 will return "" as version.
func WriteSnapshotWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, f *os.File) (string, error) {
cfg.Logger = lg.Named("client")
if len(cfg.Endpoints) != 1 {
return "", fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
Expand All @@ -56,21 +56,12 @@ func SaveWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, d
}
defer cli.Close()

partpath := dbPath + ".part"
defer os.RemoveAll(partpath)

var f *os.File
f, err = os.OpenFile(partpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fileutil.PrivateFileMode)
if err != nil {
return "", fmt.Errorf("could not open %s (%v)", partpath, err)
}
lg.Info("created temporary db file", zap.String("path", partpath))

start := time.Now()
resp, err := cli.SnapshotWithVersion(ctx)
if err != nil {
return "", err
}

defer resp.Snapshot.Close()
lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0]))
var size int64
Expand All @@ -94,9 +85,31 @@ func SaveWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, d
zap.String("etcd-version", resp.Version),
)

if err = os.Rename(partpath, dbPath); err != nil {
return resp.Version, fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
partPath := f.Name()
dbPath := strings.TrimSuffix(partPath, ".part")
if f != os.Stdout {
if err := os.Rename(partPath, dbPath); err != nil {
return resp.Version, fmt.Errorf("could not rename %s to %s (%v)", partPath, dbPath, err)
}
}
lg.Info("saved", zap.String("path", dbPath))
lg.Info("finished", zap.String("path", dbPath))
return resp.Version, nil
}

func SaveWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) (string, error) {
partPath := dbPath + ".part"
f, err := os.OpenFile(partPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fileutil.PrivateFileMode)
if err != nil {
return "", fmt.Errorf("could not open %s (%v)", partPath, err)
}
lg.Info("created temporary db file", zap.String("path", partPath))

defer os.RemoveAll(partPath)
defer f.Close()

return WriteSnapshotWithVersion(ctx, lg, cfg, f)
}

func PipeWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config) (string, error) {
return WriteSnapshotWithVersion(ctx, lg, cfg, os.Stdout)
}
15 changes: 15 additions & 0 deletions etcdctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,21 @@ DEFRAG returns a zero exit code only if it succeeded defragmenting all given end

SNAPSHOT provides commands to restore a snapshot of a running etcd server into a fresh cluster.

### SNAPSHOT PIPE

SNAPSHOT PIPE writes a point-in-time snapshot of the etcd backend database to stdout.

#### Output

The backend snapshot is written to stdout.

#### Example

Write a snapshot to stdout:
```
./etcdctl snapshot pipe > snapshot.db
```

### SNAPSHOT SAVE \<filename\>

SNAPSHOT SAVE writes a point-in-time snapshot of the etcd backend database to a file.
Expand Down
30 changes: 30 additions & 0 deletions etcdctl/ctlv3/command/snapshot_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewSnapshotCommand() *cobra.Command {
Short: "Manages etcd node snapshots",
}
cmd.AddCommand(NewSnapshotSaveCommand())
cmd.AddCommand(NewSnapshotPipeCommand())
return cmd
}

Expand All @@ -44,6 +45,14 @@ func NewSnapshotSaveCommand() *cobra.Command {
}
}

func NewSnapshotPipeCommand() *cobra.Command {
return &cobra.Command{
Use: "pipe",
Short: "Streams an etcd node backend snapshot to stdout",
Run: snapshotPipeCommandFunc,
}
}

func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
err := fmt.Errorf("snapshot save expects one argument")
Expand Down Expand Up @@ -73,3 +82,24 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
fmt.Printf("Server version %s\n", version)
}
}

func snapshotPipeCommandFunc(cmd *cobra.Command, args []string) {

lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
cfg := mustClientCfgFromCmd(cmd)

// if user does not specify "--command-timeout" flag, there will be no timeout for snapshot pipe command
ctx, cancel := context.WithCancel(context.Background())
if isCommandTimeoutFlagSet(cmd) {
ctx, cancel = commandCtx(cmd)
}
defer cancel()

_, err = snapshot.PipeWithVersion(ctx, lg, *cfg)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitInterrupted, err)
}
}
Loading