Skip to content

Commit 8804d83

Browse files
Simone Carusosiddontang
authored andcommitted
Allow to synchronise GTIDs 'OnPosSynced' (go-mysql-org#378)
1 parent 621cc12 commit 8804d83

File tree

6 files changed

+20
-14
lines changed

6 files changed

+20
-14
lines changed

.travis.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ before_install:
1919
- "sudo service mysql stop || true"
2020
- "echo '[mysqld]' | sudo tee /etc/mysql/conf.d/replication.cnf"
2121
- "echo 'server-id=1' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
22-
- "echo 'log-bin=mysql' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
22+
- "echo 'log-bin=mysql' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
2323
- "echo 'binlog-format = row' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
24+
- "echo 'gtid-mode = ON' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
25+
- "echo 'enforce_gtid_consistency = ON' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
2426

2527
# Start mysql (avoid errors to have logs)
2628
- "sudo service mysql start || true"

canal/canal.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,6 @@ func (c *Canal) run() error {
234234

235235
func (c *Canal) Close() {
236236
log.Infof("closing canal")
237-
238237
c.m.Lock()
239238
defer c.m.Unlock()
240239

@@ -245,7 +244,7 @@ func (c *Canal) Close() {
245244
c.connLock.Unlock()
246245
c.syncer.Close()
247246

248-
c.eventHandler.OnPosSynced(c.master.Position(), true)
247+
c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true)
249248
}
250249

251250
func (c *Canal) WaitDumpDone() <-chan struct{} {

canal/canal_test.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,11 @@ func (s *canalTestSuite) SetUpSuite(c *C) {
6262
s.execute(c, "INSERT INTO test.canal_test (content, name) VALUES (?, ?), (?, ?), (?, ?)", "1", "a", `\0\ndsfasdf`, "b", "", "c")
6363

6464
s.execute(c, "SET GLOBAL binlog_format = 'ROW'")
65-
65+
6666
s.c.SetEventHandler(&testEventHandler{c: c})
6767
go func() {
68-
err = s.c.Run()
68+
set, _ := mysql.ParseGTIDSet("mysql", "")
69+
err = s.c.StartFromGTID(set)
6970
c.Assert(err, IsNil)
7071
}()
7172
}
@@ -89,7 +90,6 @@ func (s *canalTestSuite) execute(c *C, query string, args ...interface{}) *mysql
8990

9091
type testEventHandler struct {
9192
DummyEventHandler
92-
9393
c *C
9494
}
9595

@@ -102,6 +102,10 @@ func (h *testEventHandler) String() string {
102102
return "testEventHandler"
103103
}
104104

105+
func (h *testEventHandler) OnPosSynced(p mysql.Position, set mysql.GTIDSet, f bool) error {
106+
return nil
107+
}
108+
105109
func (s *canalTestSuite) TestCanal(c *C) {
106110
<-s.c.WaitDumpDone()
107111

canal/dump.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func (c *Canal) dump() error {
158158

159159
pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
160160
c.master.Update(pos)
161-
if err := c.eventHandler.OnPosSynced(pos, true); err != nil {
161+
if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil {
162162
return errors.Trace(err)
163163
}
164164
var startPos fmt.Stringer = pos

canal/handler.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type EventHandler interface {
1616
OnXID(nextPos mysql.Position) error
1717
OnGTID(gtid mysql.GTIDSet) error
1818
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
19-
OnPosSynced(pos mysql.Position, force bool) error
19+
OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error
2020
String() string
2121
}
2222

@@ -28,11 +28,12 @@ func (h *DummyEventHandler) OnTableChanged(schema string, table string) error {
2828
func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
2929
return nil
3030
}
31-
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
32-
func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil }
33-
func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil }
34-
func (h *DummyEventHandler) OnPosSynced(mysql.Position, bool) error { return nil }
35-
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
31+
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
32+
func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil }
33+
func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil }
34+
func (h *DummyEventHandler) OnPosSynced(mysql.Position, mysql.GTIDSet, bool) error { return nil }
35+
36+
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
3637

3738
// `SetEventHandler` registers the sync handler, you must register your
3839
// own handler before starting Canal.

canal/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (c *Canal) runSyncBinlog() error {
171171
if savePos {
172172
c.master.Update(pos)
173173
c.master.UpdateTimestamp(ev.Header.Timestamp)
174-
if err := c.eventHandler.OnPosSynced(pos, force); err != nil {
174+
if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force); err != nil {
175175
return errors.Trace(err)
176176
}
177177
}

0 commit comments

Comments
 (0)