diff --git a/client/v3/snapshot/v3_snapshot.go b/client/v3/snapshot/v3_snapshot.go index b2db5f0c1371..1544f3883e83 100644 --- a/client/v3/snapshot/v3_snapshot.go +++ b/client/v3/snapshot/v3_snapshot.go @@ -98,5 +98,50 @@ func SaveWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, d return resp.Version, fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err) } lg.Info("saved", zap.String("path", dbPath)) + lg.Info("saved", zap.String("path", "/dev/stdout")) + return resp.Version, nil +} + +func PipeWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config) (string, error) { + cfg.Logger = lg.Named("client") + if len(cfg.Endpoints) != 1 { + return "", fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints) + } + cli, err := clientv3.New(cfg) + if err != nil { + return "", err + } + defer cli.Close() + + start := time.Now() + resp, err := cli.SnapshotWithVersion(ctx) + if err != nil { + return "", err + } + defer resp.Snapshot.Close() + lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0])) + var size int64 + f := os.Stdout + size, err = io.Copy(f, resp.Snapshot) + if err != nil { + return resp.Version, err + } + if !hasChecksum(size) { + return resp.Version, fmt.Errorf("sha256 checksum not found [bytes: %d]", size) + } + if err = fileutil.Fsync(f); err != nil { + return resp.Version, err + } + if err = f.Close(); err != nil { + return resp.Version, err + } + lg.Info("fetched snapshot", + zap.String("endpoint", cfg.Endpoints[0]), + zap.String("size", humanize.Bytes(uint64(size))), + zap.Duration("took", time.Since(start)), + zap.String("etcd-version", resp.Version), + ) + + lg.Info("saved", zap.String("path", "/dev/stdout")) return resp.Version, nil } diff --git a/etcdctl/README.md b/etcdctl/README.md index efff407cff58..836a09ed01bb 100644 --- a/etcdctl/README.md +++ b/etcdctl/README.md @@ -1012,6 +1012,21 @@ DEFRAG returns a zero exit code only if it succeeded defragmenting all given end SNAPSHOT provides commands to restore a snapshot of a running etcd server into a fresh cluster. +### SNAPSHOT PIPE \ + +SNAPSHOT PIPE writes a point-in-time snapshot of the etcd backend database to stdout. + +#### Output + +The backend snapshot is written to stdout. + +#### Example + +Write a snapshot to stdout: +``` +./etcdctl snapshot pipe +``` + ### SNAPSHOT SAVE \ SNAPSHOT SAVE writes a point-in-time snapshot of the etcd backend database to a file. diff --git a/etcdctl/ctlv3/command/snapshot_command.go b/etcdctl/ctlv3/command/snapshot_command.go index df317e23cc78..94daf2ff4156 100644 --- a/etcdctl/ctlv3/command/snapshot_command.go +++ b/etcdctl/ctlv3/command/snapshot_command.go @@ -33,6 +33,7 @@ func NewSnapshotCommand() *cobra.Command { Short: "Manages etcd node snapshots", } cmd.AddCommand(NewSnapshotSaveCommand()) + cmd.AddCommand(NewSnapshotPipeCommand()) return cmd } @@ -44,6 +45,14 @@ func NewSnapshotSaveCommand() *cobra.Command { } } +func NewSnapshotPipeCommand() *cobra.Command { + return &cobra.Command{ + Use: "pipe", + Short: "Streams an etcd node backend snapshot to STDOUT", + Run: snapshotPipeCommandFunc, + } +} + func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) { if len(args) != 1 { err := fmt.Errorf("snapshot save expects one argument") @@ -73,3 +82,24 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) { fmt.Printf("Server version %s\n", version) } } + +func snapshotPipeCommandFunc(cmd *cobra.Command, args []string) { + + lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel) + if err != nil { + cobrautl.ExitWithError(cobrautl.ExitError, err) + } + cfg := mustClientCfgFromCmd(cmd) + + // if user does not specify "--command-timeout" flag, there will be no timeout for snapshot pipe command + ctx, cancel := context.WithCancel(context.Background()) + if isCommandTimeoutFlagSet(cmd) { + ctx, cancel = commandCtx(cmd) + } + defer cancel() + + _, err = snapshot.PipeWithVersion(ctx, lg, *cfg) + if err != nil { + cobrautl.ExitWithError(cobrautl.ExitInterrupted, err) + } +}