Skip to content

Commit fbfe0c7

Browse files
author
Shlomi Noach
committed
Merge pull request #37 from github/postpone-swap-tables-flag-file
postpone-swap-tables-flag-file
2 parents 9a3c607 + 879b2b4 commit fbfe0c7

File tree

4 files changed

+53
-7
lines changed

4 files changed

+53
-7
lines changed

go/base/context.go

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type MigrationContext struct {
5252
ThrottleFlagFile string
5353
ThrottleAdditionalFlagFile string
5454
MaxLoad map[string]int64
55+
PostponeSwapTablesFlagFile string
5556
SwapTablesTimeoutSeconds int64
5657

5758
Noop bool

go/base/utils.go

+8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package base
77

88
import (
99
"fmt"
10+
"os"
1011
"regexp"
1112
"time"
1213
)
@@ -23,3 +24,10 @@ func PrettifyDurationOutput(d time.Duration) string {
2324
result = prettifyDurationRegexp.ReplaceAllString(result, "")
2425
return result
2526
}
27+
28+
func FileExists(fileName string) bool {
29+
if _, err := os.Stat(fileName); err == nil {
30+
return true
31+
}
32+
return false
33+
}

go/cmd/gh-ost/main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ func main() {
5353
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
5454
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
5555
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
56+
flag.StringVar(&migrationContext.PostponeSwapTablesFlagFile, "postpone-swap-tables-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Swapping would be ready to perform the moment the file is deleted.")
57+
5658
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'")
5759
quiet := flag.Bool("quiet", false, "quiet")
5860
verbose := flag.Bool("verbose", false, "verbose")
@@ -117,7 +119,7 @@ func main() {
117119
log.Fatale(err)
118120
}
119121

120-
log.Info("starting gh-ost %+v", AppVersion)
122+
log.Infof("starting gh-ost %+v", AppVersion)
121123

122124
migrator := logic.NewMigrator()
123125
err := migrator.Migrate()

go/logic/migrator.go

+41-6
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ type Migrator struct {
4848
voluntaryLockAcquired chan bool
4949
panicAbort chan error
5050

51-
allEventsUpToLockProcessedFlag int64
51+
rowCopyCompleteFlag int64
52+
allEventsUpToLockProcessedInjectedFlag int64
5253
// copyRowsQueue should not be buffered; if buffered some non-damaging but
5354
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
5455
copyRowsQueue chan tableWriteFunc
@@ -66,7 +67,7 @@ func NewMigrator() *Migrator {
6667
voluntaryLockAcquired: make(chan bool, 1),
6768
panicAbort: make(chan error),
6869

69-
allEventsUpToLockProcessedFlag: 0,
70+
allEventsUpToLockProcessedInjectedFlag: 0,
7071

7172
copyRowsQueue: make(chan tableWriteFunc),
7273
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
@@ -93,13 +94,13 @@ func (this *Migrator) acceptSignals() {
9394
func (this *Migrator) shouldThrottle() (result bool, reason string) {
9495
// User-based throttle
9596
if this.migrationContext.ThrottleFlagFile != "" {
96-
if _, err := os.Stat(this.migrationContext.ThrottleFlagFile); err == nil {
97+
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
9798
// Throttle file defined and exists!
9899
return true, "flag-file"
99100
}
100101
}
101102
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
102-
if _, err := os.Stat(this.migrationContext.ThrottleAdditionalFlagFile); err == nil {
103+
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
103104
// 2nd Throttle file defined and exists!
104105
return true, "flag-file"
105106
}
@@ -109,7 +110,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
109110
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
110111
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
111112
}
112-
if this.migrationContext.TestOnReplica && (atomic.LoadInt64(&this.allEventsUpToLockProcessedFlag) == 0) {
113+
if this.migrationContext.TestOnReplica && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) == 0) {
113114
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
114115
if err != nil {
115116
return true, err.Error()
@@ -172,6 +173,21 @@ func (this *Migrator) throttle(onThrottled func()) {
172173
}
173174
}
174175

176+
// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
177+
// (or fails with error)
178+
func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
179+
for {
180+
shouldSleep, err := operation()
181+
if err != nil {
182+
return err
183+
}
184+
if !shouldSleep {
185+
return nil
186+
}
187+
time.Sleep(time.Second)
188+
}
189+
}
190+
175191
// retryOperation attempts up to `count` attempts at running given function,
176192
// exiting as soon as it returns with non-error.
177193
func (this *Migrator) retryOperation(operation func() error) (err error) {
@@ -205,6 +221,7 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
205221
// consumers and drops any further incoming events that may be left hanging.
206222
func (this *Migrator) consumeRowCopyComplete() {
207223
<-this.rowCopyComplete
224+
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
208225
go func() {
209226
for <-this.rowCopyComplete {
210227
}
@@ -330,6 +347,20 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
330347
log.Debugf("throttling before swapping tables")
331348
})
332349

350+
this.sleepWhileTrue(
351+
func() (bool, error) {
352+
if this.migrationContext.PostponeSwapTablesFlagFile == "" {
353+
return false, nil
354+
}
355+
if base.FileExists(this.migrationContext.PostponeSwapTablesFlagFile) {
356+
// Throttle file defined and exists!
357+
log.Debugf("Postponing final table swap as flag file exists: %+v", this.migrationContext.PostponeSwapTablesFlagFile)
358+
return true, nil
359+
}
360+
return false, nil
361+
},
362+
)
363+
333364
if this.migrationContext.TestOnReplica {
334365
return this.stopWritesAndCompleteMigrationOnReplica()
335366
}
@@ -374,8 +405,8 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
374405
return err
375406
}
376407
log.Debugf("Waiting for events up to lock")
408+
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1)
377409
<-this.allEventsUpToLockProcessed
378-
atomic.StoreInt64(&this.allEventsUpToLockProcessedFlag, 1)
379410
log.Debugf("Done waiting for events up to lock")
380411
this.printStatus()
381412

@@ -687,6 +718,10 @@ func (this *Migrator) iterateChunks() error {
687718
return terminateRowIteration(nil)
688719
}
689720
for {
721+
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
722+
// Done
723+
return nil
724+
}
690725
copyRowsFunc := func() error {
691726
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
692727
if err != nil {

0 commit comments

Comments
 (0)