Skip to content

Commit dc96de1

Browse files
committed
Move restic related functions to archiver package
1 parent b22ebe3 commit dc96de1

File tree

9 files changed

+704
-212
lines changed

9 files changed

+704
-212
lines changed

cluster/cluster.go

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/signal18/replication-manager/config"
3535
v3 "github.com/signal18/replication-manager/repmanv3"
3636
"github.com/signal18/replication-manager/router/maxscale"
37+
"github.com/signal18/replication-manager/utils/archiver"
3738
"github.com/signal18/replication-manager/utils/cron"
3839
"github.com/signal18/replication-manager/utils/dbhelper"
3940
"github.com/signal18/replication-manager/utils/logrus/hooks/pushover"
@@ -229,6 +230,7 @@ type Cluster struct {
229230
InResticBackup bool `json:"inResticBackup"`
230231
InRollingRestart bool `json:"inRollingRestart"`
231232
Mailer *mailer.Mailer `json:"-"`
233+
ResticRepo *archiver.ResticRepo `json:"-"`
232234
LastDelayStatPrint time.Time
233235
sync.Mutex
234236
crcTable *crc64.Table
@@ -513,6 +515,7 @@ func (cluster *Cluster) InitFromConf() {
513515
cluster.initScheduler()
514516
cluster.CheckDefaultUser(true)
515517
cluster.SetToolVersions()
518+
cluster.StartResticRepo()
516519
}
517520

518521
func (cluster *Cluster) initOrchetratorNodes() {

cluster/cluster_bck.go

+50-207
Original file line numberDiff line numberDiff line change
@@ -7,94 +7,15 @@
77
package cluster
88

99
import (
10-
"bytes"
11-
"encoding/json"
12-
"errors"
1310
"fmt"
14-
"io"
1511
"os"
16-
"os/exec"
17-
"strconv"
18-
"strings"
19-
"sync"
20-
"time"
2112

2213
"github.com/signal18/replication-manager/config"
23-
v3 "github.com/signal18/replication-manager/repmanv3"
14+
"github.com/signal18/replication-manager/utils/archiver"
2415
"github.com/signal18/replication-manager/utils/state"
16+
"github.com/sirupsen/logrus"
2517
)
2618

27-
/* Replaced by v3.Backup
28-
type Backup struct {
29-
Id string `json:"id"`
30-
ShortId string `json:"short_id"`
31-
Time string `json:"time"`
32-
Tree string `json:"tree"`
33-
Paths []string `json:"paths"`
34-
Hostname string `json:"hostname"`
35-
Username string `json:"username"`
36-
UID int64 `json:"uid"`
37-
GID int64 `json:"gid"`
38-
}
39-
*/
40-
41-
func (cluster *Cluster) CheckResticInstallation() {
42-
if cluster.Conf.BackupRestic && cluster.VersionsMap.Get("restic") == nil {
43-
if err := cluster.SetResticVersion(); err != nil {
44-
cluster.SetState("WARN0121", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0121"], err), ErrFrom: "CLUSTER"})
45-
} else {
46-
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Restic version: %s", cluster.VersionsMap.Get("restic").ToString())
47-
}
48-
}
49-
}
50-
51-
func (cluster *Cluster) ResticPurgeRepo() error {
52-
if cluster.Conf.BackupRestic {
53-
//This will prevent purging while restic is fetching and wait since it's only executed once after a while
54-
if !cluster.canResticFetchRepo {
55-
time.Sleep(time.Second)
56-
return cluster.ResticPurgeRepo()
57-
}
58-
cluster.canResticFetchRepo = false
59-
defer func() { cluster.canResticFetchRepo = true }()
60-
// var stdout, stderr []byte
61-
var stdoutBuf, stderrBuf bytes.Buffer
62-
var errStdout, errStderr error
63-
resticcmd := exec.Command(cluster.Conf.BackupResticBinaryPath, "forget", "--prune", "--keep-last", "10", "--keep-hourly", strconv.Itoa(cluster.Conf.BackupKeepHourly), "--keep-daily", strconv.Itoa(cluster.Conf.BackupKeepDaily), "--keep-weekly", strconv.Itoa(cluster.Conf.BackupKeepWeekly), "--keep-monthly", strconv.Itoa(cluster.Conf.BackupKeepMonthly), "--keep-yearly", strconv.Itoa(cluster.Conf.BackupKeepYearly))
64-
stdoutIn, _ := resticcmd.StdoutPipe()
65-
stderrIn, _ := resticcmd.StderrPipe()
66-
stdoutTee := io.TeeReader(stdoutIn, &stdoutBuf)
67-
stderrTee := io.TeeReader(stderrIn, &stderrBuf)
68-
resticcmd.Env = cluster.ResticGetEnv()
69-
if err := resticcmd.Start(); err != nil {
70-
cluster.SetState("WARN0094", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0094"], resticcmd.Path, err, ""), ErrFrom: "BACKUP"})
71-
return err
72-
}
73-
var wg sync.WaitGroup
74-
wg.Add(2)
75-
go func() {
76-
defer wg.Done()
77-
cluster.CopyLogs(stdoutTee, config.ConstLogModSST, config.LvlDbg, "restic")
78-
}()
79-
80-
go func() {
81-
defer wg.Done()
82-
cluster.CopyLogs(stderrTee, config.ConstLogModSST, config.LvlDbg, "restic")
83-
}()
84-
wg.Wait()
85-
86-
err := resticcmd.Wait()
87-
if err != nil {
88-
cluster.SetState("WARN0094", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0094"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
89-
return err
90-
}
91-
if errStdout != nil || errStderr != nil {
92-
return errors.New("failed to capture stdout or stderr\n")
93-
}
94-
}
95-
return nil
96-
}
97-
9819
func (cluster *Cluster) ResticGetEnv() []string {
9920
newEnv := append(os.Environ(), "RESTIC_PASSWORD="+cluster.Conf.GetDecryptedValue("backup-restic-password"))
10021
if cluster.Conf.BackupResticAws {
@@ -113,154 +34,76 @@ func (cluster *Cluster) ResticGetEnv() []string {
11334
return newEnv
11435
}
11536

116-
func (cluster *Cluster) ResticInitRepo() error {
117-
if cluster.Conf.BackupRestic {
118-
// var stdout, stderr []byte
119-
var stdoutBuf, stderrBuf bytes.Buffer
120-
var errStdout, errStderr error
121-
resticcmd := exec.Command(cluster.Conf.BackupResticBinaryPath, "init")
122-
stdoutIn, _ := resticcmd.StdoutPipe()
123-
stderrIn, _ := resticcmd.StderrPipe()
124-
stdoutTee := io.TeeReader(stdoutIn, &stdoutBuf)
125-
stderrTee := io.TeeReader(stderrIn, &stderrBuf)
126-
127-
resticcmd.Env = cluster.ResticGetEnv()
128-
if err := resticcmd.Start(); err != nil {
129-
cluster.SetState("WARN0095", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0095"], resticcmd.Path, err, ""), ErrFrom: "BACKUP"})
130-
return err
37+
func (cluster *Cluster) CheckResticInstallation() {
38+
if cluster.Conf.BackupRestic && cluster.VersionsMap.Get("restic") == nil {
39+
if err := cluster.SetResticVersion(); err != nil {
40+
cluster.SetState("WARN0121", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0121"], err), ErrFrom: "CLUSTER"})
41+
} else {
42+
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Restic version: %s", cluster.VersionsMap.Get("restic").ToString())
13143
}
132-
var wg sync.WaitGroup
133-
wg.Add(2)
134-
go func() {
135-
defer wg.Done()
136-
cluster.CopyLogs(stdoutTee, config.ConstLogModSST, config.LvlDbg, "restic")
137-
}()
44+
}
45+
}
13846

139-
go func() {
140-
defer wg.Done()
141-
cluster.CopyLogs(stderrTee, config.ConstLogModSST, config.LvlDbg, "restic")
142-
}()
143-
wg.Wait()
47+
func (cluster *Cluster) StartResticRepo() error {
48+
if !cluster.Conf.BackupRestic {
49+
return nil
50+
}
14451

145-
err := resticcmd.Wait()
146-
if err != nil {
147-
cluster.SetState("WARN0095", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0095"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
148-
}
149-
if errStdout != nil || errStderr != nil {
150-
return errors.New("failed to capture stdout or stderr\n")
151-
}
52+
var loglevel logrus.Level
53+
if cluster.Conf.LogArchiveLevel > 0 {
54+
loglevel = config.ToLogrusLevel(cluster.Conf.LogArchiveLevel)
15255
}
56+
57+
cluster.ResticRepo = archiver.NewResticRepo(cluster.Conf.BackupResticBinaryPath, cluster.Logrus, logrus.Fields{"cluster": cluster.Name, "type": "log", "module": "restic"}, loglevel)
15358
return nil
15459
}
15560

156-
func (cluster *Cluster) ResticFetchRepo() error {
157-
// No need to add wait since it will be checked each monitor loop
158-
if cluster.Conf.BackupRestic && cluster.canResticFetchRepo {
159-
cluster.canResticFetchRepo = false
160-
defer func() { cluster.canResticFetchRepo = true }()
161-
// var stdout, stderr []byte
162-
var stdoutBuf, stderrBuf bytes.Buffer
163-
var errStdout, errStderr error
164-
resticcmd := exec.Command(cluster.Conf.BackupResticBinaryPath, "snapshots", "--json")
165-
stdoutIn, _ := resticcmd.StdoutPipe()
166-
stderrIn, _ := resticcmd.StderrPipe()
167-
stdoutTee := io.TeeReader(stdoutIn, &stdoutBuf)
168-
stderrTee := io.TeeReader(stderrIn, &stderrBuf)
61+
func (cluster *Cluster) ResticInitRepo() error {
62+
if !cluster.Conf.BackupRestic {
63+
return nil
64+
}
16965

170-
resticcmd.Env = cluster.ResticGetEnv()
171-
if err := resticcmd.Start(); err != nil {
172-
cluster.SetState("WARN0093", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0093"], resticcmd.Path, err, ""), ErrFrom: "BACKUP"})
173-
return err
174-
}
175-
var wg sync.WaitGroup
176-
wg.Add(2)
177-
go func() {
178-
defer wg.Done()
179-
cluster.CopyLogs(stdoutTee, config.ConstLogModSST, config.LvlDbg, "restic")
180-
}()
66+
cluster.ResticRepo.SetEnv(cluster.ResticGetEnv())
67+
err := cluster.ResticRepo.ResticInitRepo()
68+
if err != nil {
69+
cluster.SetState("WARN0092", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0092"], err), ErrFrom: "BACKUP"})
70+
}
18171

182-
go func() {
183-
defer wg.Done()
184-
cluster.CopyLogs(stderrTee, config.ConstLogModSST, config.LvlDbg, "restic")
185-
}()
186-
wg.Wait()
72+
return err
73+
}
18774

188-
err := resticcmd.Wait()
189-
if err != nil {
190-
cluster.SetState("WARN0093", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0093"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
191-
cluster.ResticInitRepo()
192-
return err
193-
}
194-
if errStdout != nil || errStderr != nil {
195-
return errors.New("failed to capture stdout or stderr\n")
75+
func (cluster *Cluster) ResticPurgeRepo() error {
76+
if cluster.Conf.BackupRestic {
77+
cluster.ResticRepo.SetEnv(cluster.ResticGetEnv())
78+
79+
opt := archiver.ResticPurgeOption{
80+
KeepHourly: cluster.Conf.BackupKeepHourly,
81+
KeepDaily: cluster.Conf.BackupKeepDaily,
82+
KeepWeekly: cluster.Conf.BackupKeepWeekly,
83+
KeepMonthly: cluster.Conf.BackupKeepMonthly,
84+
KeepYearly: cluster.Conf.BackupKeepYearly,
19685
}
19786

198-
var repo []v3.Backup
199-
err = json.Unmarshal(stdoutBuf.Bytes(), &repo)
87+
_, err := cluster.ResticRepo.AddPurgeTask(opt, true)
20088
if err != nil {
201-
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Error unmaeshal backups %s", err)
89+
cluster.SetState("WARN0094", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0094"], err), ErrFrom: "BACKUP"})
20290
return err
20391
}
204-
var filterRepo []v3.Backup
205-
for _, bck := range repo {
206-
if strings.Contains(bck.Paths[0], cluster.Name) {
207-
filterRepo = append(filterRepo, bck)
208-
}
209-
}
210-
cluster.Backups = filterRepo
211-
212-
cluster.ResticFetchRepoStat()
21392
}
214-
21593
return nil
21694
}
21795

218-
func (cluster *Cluster) ResticFetchRepoStat() error {
219-
220-
var stdoutBuf, stderrBuf bytes.Buffer
221-
var errStdout, errStderr error
222-
resticcmd := exec.Command(cluster.Conf.BackupResticBinaryPath, "stats", "--mode", "raw-data", "--json")
223-
stdoutIn, _ := resticcmd.StdoutPipe()
224-
stderrIn, _ := resticcmd.StderrPipe()
225-
stdoutTee := io.TeeReader(stdoutIn, &stdoutBuf)
226-
stderrTee := io.TeeReader(stderrIn, &stderrBuf)
227-
228-
resticcmd.Env = cluster.ResticGetEnv()
229-
if err := resticcmd.Start(); err != nil {
230-
cluster.SetState("WARN0093", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0093"], resticcmd.Path, err, ""), ErrFrom: "BACKUP"})
231-
return err
232-
}
233-
var wg sync.WaitGroup
234-
wg.Add(2)
235-
go func() {
236-
defer wg.Done()
237-
cluster.CopyLogs(stdoutTee, config.ConstLogModSST, config.LvlDbg, "restic")
238-
}()
239-
240-
go func() {
241-
defer wg.Done()
242-
cluster.CopyLogs(stderrTee, config.ConstLogModSST, config.LvlDbg, "restic")
243-
}()
244-
wg.Wait()
245-
246-
err := resticcmd.Wait()
247-
if err != nil {
248-
cluster.SetState("WARN0093", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0093"], err, string(stdoutBuf.Bytes()), string(stderrBuf.Bytes())), ErrFrom: "CHECK"})
249-
cluster.ResticInitRepo()
250-
return err
251-
}
252-
if errStdout != nil || errStderr != nil {
253-
return errors.New("failed to capture stdout or stderr\n")
96+
func (cluster *Cluster) ResticFetchRepo() error {
97+
// No need to add wait since it will be checked each monitor loop
98+
if !cluster.Conf.BackupRestic {
99+
return nil
254100
}
255101

256-
var repostat v3.BackupStat
257-
err = json.Unmarshal(stdoutBuf.Bytes(), &repostat)
102+
cluster.ResticRepo.SetEnv(cluster.ResticGetEnv())
103+
_, err := cluster.ResticRepo.AddFetchTask(true)
258104
if err != nil {
259-
cluster.LogModulePrintf(cluster.Conf.Verbose, config.ConstLogModGeneral, config.LvlInfo, "Error unmarshal backups %s", err)
260-
return err
105+
cluster.SetState("WARN0091", state.State{ErrType: "WARNING", ErrDesc: fmt.Sprintf(clusterError["WARN0091"], err), ErrFrom: "BACKUP"})
261106
}
262-
cluster.BackupStat = repostat
263-
// }
264107

265-
return nil
108+
return err
266109
}

cluster/cluster_get.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
auth "github.com/hashicorp/vault/api/auth/approle"
2424
"github.com/siddontang/go/log"
2525
"github.com/signal18/replication-manager/config"
26-
v3 "github.com/signal18/replication-manager/repmanv3"
26+
"github.com/signal18/replication-manager/utils/archiver"
2727
"github.com/signal18/replication-manager/utils/cron"
2828
"github.com/signal18/replication-manager/utils/dbhelper"
2929
"github.com/signal18/replication-manager/utils/misc"
@@ -1023,8 +1023,12 @@ func (cluster *Cluster) GetTableDLLNoFK(schema string, table string, srv *Server
10231023
return ddl, err
10241024
}
10251025

1026-
func (cluster *Cluster) GetBackups() []v3.Backup {
1027-
return cluster.Backups
1026+
func (cluster *Cluster) GetBackups() []archiver.Backup {
1027+
if cluster.ResticRepo == nil {
1028+
return make([]archiver.Backup, 0)
1029+
}
1030+
1031+
return cluster.ResticRepo.Backups
10281032
}
10291033

10301034
func (cluster *Cluster) GetQueryRules() []config.QueryRule {

cluster/cluster_tgl.go

+3
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ func (cluster *Cluster) SwitchProvDockerDaemonPrivate() {
134134
func (cluster *Cluster) SwitchBackupRestic() {
135135
cluster.Conf.BackupRestic = !cluster.Conf.BackupRestic
136136
cluster.CheckResticInstallation()
137+
if cluster.ResticRepo == nil {
138+
cluster.StartResticRepo()
139+
}
137140
}
138141

139142
func (cluster *Cluster) SwitchBackupResticAws() {

config/config.go

+5
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ type Config struct {
141141
LogGraphiteLevel int `mapstructure:"log-graphite-level" toml:"log-graphite-level" json:"logGraphiteLevel"`
142142
LogBinlogPurge bool `mapstructure:"log-binlog-purge" toml:"log-binlog-purge" json:"logBinlogPurge"`
143143
LogBinlogPurgeLevel int `mapstructure:"log-binlog-purge-level" toml:"log-binlog-purge-level" json:"logBinlogPurgeLevel"`
144+
LogArchiveLevel int `mapstructure:"log-archive-level" toml:"log-archive-level" json:"logArchiveLevel"`
144145
User string `mapstructure:"db-servers-credential" toml:"db-servers-credential" json:"dbServersCredential"`
145146
Hosts string `mapstructure:"db-servers-hosts" toml:"db-servers-hosts" json:"dbServersHosts"`
146147
DbServersChangeStateScript string `mapstructure:"db-servers-state-change-script" toml:"db-servers-state-change-script" json:"dbServersStateChangeScript"`
@@ -650,6 +651,7 @@ type Config struct {
650651
BackupResticRepository string `mapstructure:"backup-restic-repository" toml:"backup-restic-repository" json:"backupResticRepository"`
651652
BackupResticPassword string `mapstructure:"backup-restic-password" toml:"backup-restic-password" json:"-"`
652653
BackupResticAws bool `mapstructure:"backup-restic-aws" toml:"backup-restic-aws" json:"backupResticAws"`
654+
BackupResticTimeout int `mapstructure:"backup-restic-timeout" toml:"backup-restic-timeout" json:"backupResticTimeout"`
653655
BackupStreaming bool `mapstructure:"backup-streaming" toml:"backup-streaming" json:"backupStreaming"`
654656
BackupStreamingDebug bool `mapstructure:"backup-streaming-debug" toml:"backup-streaming-debug" json:"backupStreamingDebug"`
655657
BackupStreamingAwsAccessKeyId string `mapstructure:"backup-streaming-aws-access-key-id" toml:"backup-streaming-aws-access-key-id" json:"-"`
@@ -1161,6 +1163,7 @@ const (
11611163
ConstLogModGraphite = 15
11621164
ConstLogModPurge = 16
11631165
ConstLogModTask = 17
1166+
ConstLogModArchive = 18
11641167
)
11651168

11661169
/*
@@ -3001,6 +3004,8 @@ func (conf *Config) IsEligibleForPrinting(module int, level string) bool {
30013004
if conf.LogTask {
30023005
return conf.LogTaskLevel >= lvl
30033006
}
3007+
case module == ConstLogModArchive:
3008+
return conf.LogArchiveLevel >= lvl
30043009
}
30053010
}
30063011

0 commit comments

Comments
 (0)