Skip to content

Commit 97cf753

Browse files
committed
fix database connection pool
1 parent 368191e commit 97cf753

File tree

3 files changed

+87
-57
lines changed

3 files changed

+87
-57
lines changed

go/src/app/binlogsync/binlog_sync.go

Lines changed: 54 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@ package main
22

33
import (
44
"context"
5+
"database/sql"
56
"fmt"
67
"log"
78
"reflect"
89
"sort"
910
"sync"
1011
"time"
1112

13+
"github.com/go-sql-driver/mysql"
1214
uuid "github.com/satori/go.uuid"
13-
"github.com/siddontang/go-mysql/client"
1415
"github.com/siddontang/go-mysql/replication"
1516
)
1617

@@ -24,7 +25,7 @@ type Connection struct {
2425
}
2526

2627
type DBInfo struct {
27-
Conn *client.Conn
28+
Conn *sql.DB
2829
Shard *DBShard
2930
}
3031

@@ -38,21 +39,22 @@ type WriteEvent struct {
3839
event *replication.BinlogEvent
3940
before map[string]interface{}
4041
after map[string]interface{}
41-
dbInfo *DBInfo
4242
nextGTID string
4343
}
4444

4545
type BinlogSyncer struct {
4646
Config
4747

48-
DBPool map[string]*client.Conn
48+
DBPool map[string]*sql.DB
4949
WriteChs []chan *WriteEvent
5050
CountCh chan *Status
5151

5252
mutex *sync.Mutex
5353
wg *sync.WaitGroup
5454
shellLog *log.Logger
5555
fileLog *log.Logger
56+
57+
stop bool
5658
}
5759

5860
type BinlogPosition struct {
@@ -62,11 +64,11 @@ type BinlogPosition struct {
6264
}
6365

6466
type OutMessage struct {
65-
Synced int64 `yaml:"Synced"`
66-
Faild int64 `yaml:"Faild"`
67-
Rate float64 `yaml:"Rate(rows/s)"`
68-
Position BinlogPosition `yaml:"Position"`
69-
Error map[string]int `yaml:"Error"`
67+
Synced int64 `yaml:"Synced"`
68+
Faild int64 `yaml:"Faild"`
69+
RowsPerSec float64 `yaml:"RowsPerSec"`
70+
Position BinlogPosition `yaml:"Position"`
71+
Error map[string]int `yaml:"Error"`
7072
}
7173

7274
type Status struct {
@@ -83,14 +85,15 @@ func NewBinlogSyncer(conf Config) *BinlogSyncer {
8385
bs := &BinlogSyncer{
8486
Config: conf,
8587

86-
DBPool: make(map[string]*client.Conn),
88+
DBPool: make(map[string]*sql.DB),
8789
WriteChs: make([]chan *WriteEvent, conf.WorkerCnt),
8890
CountCh: make(chan *Status, channelCapacity*conf.WorkerCnt),
8991

9092
mutex: &sync.Mutex{},
9193
wg: &sync.WaitGroup{},
9294
shellLog: shellLog,
9395
fileLog: fileLog,
96+
stop: false,
9497
}
9598

9699
return bs
@@ -131,9 +134,8 @@ func (bs *BinlogSyncer) Sync() {
131134
}
132135

133136
func (bs *BinlogSyncer) formatRow(srcRow []interface{}) map[string]interface{} {
134-
// the first column `id` should not put in new rowValue
135-
rowValue := make([]interface{}, len(srcRow)-1)
136-
for i, v := range srcRow[1:] {
137+
rowValue := make([]interface{}, len(srcRow))
138+
for i, v := range srcRow {
137139
if v == nil {
138140
continue
139141
}
@@ -163,10 +165,8 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) {
163165

164166
for ev := range inCh {
165167
if ev.event.Header.EventType == replication.ROTATE_EVENT {
166-
rotateEv, ok := ev.event.Event.(*replication.RotateEvent)
167-
if !ok {
168-
bs.fileLog.Printf("event is RotateEvent, but cannot convert to a RotateEvent")
169-
}
168+
rotateEv, _ := ev.event.Event.(*replication.RotateEvent)
169+
170170
stat := &Status{
171171
goroutineIndex: chIdx,
172172
position: BinlogPosition{
@@ -178,10 +178,12 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) {
178178
continue
179179
}
180180

181+
var dbInfo *DBInfo
181182
var err error
182-
ev.dbInfo, err = bs.getEventConnection(ev.after)
183+
dbInfo, err = bs.getWriteConnection(ev.after)
183184
if err != nil {
184185
bs.shellLog.Printf("[%s] get connection failed: %v\n", bs.SourceConn.Addr, err)
186+
bs.stop = true
185187
return
186188
}
187189

@@ -198,25 +200,28 @@ func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) {
198200
bs.fileLog.Printf("routin index: %d, before: %v, after: %v, event type: %v\n", chIdx, ev.before, ev.after, evType)
199201

200202
if evType == replication.UPDATE_ROWS_EVENTv2 || evType == replication.UPDATE_ROWS_EVENTv1 || evType == replication.UPDATE_ROWS_EVENTv0 {
201-
sql = makeUpdateSql(ev.dbInfo.Shard.Table, bs.TableIndex, bs.TableField, index, value)
203+
sql = makeUpdateSql(dbInfo.Shard.Table, bs.TableField, bs.TableField, value, value)
202204

203205
} else if evType == replication.DELETE_ROWS_EVENTv2 || evType == replication.DELETE_ROWS_EVENTv1 || evType == replication.DELETE_ROWS_EVENTv0 {
204206

205-
sql = makeDeleteSql(ev.dbInfo.Shard.Table, bs.TableIndex, index)
207+
sql = makeDeleteSql(dbInfo.Shard.Table, bs.TableIndex, index)
206208
} else if evType == replication.WRITE_ROWS_EVENTv2 || evType == replication.WRITE_ROWS_EVENTv1 || evType == replication.WRITE_ROWS_EVENTv0 {
207209

208-
sql = makeInsertSql(ev.dbInfo.Shard.Table, bs.TableField, value)
210+
sql = makeInsertSql(dbInfo.Shard.Table, bs.TableField, value)
209211
} else {
210212
continue
211213
}
212214

213215
bs.fileLog.Printf("routin index: %d, get sql statement: %v", chIdx, sql)
214216

215-
bs.mutex.Lock()
216-
_, err = ev.dbInfo.Conn.Execute(sql)
217-
bs.mutex.Unlock()
217+
_, err = dbInfo.Conn.Exec(sql)
218218
if err != nil {
219219
bs.fileLog.Printf("Execute error: %v\n", err)
220+
221+
sqlErr := err.(*mysql.MySQLError)
222+
if sqlErr.Number != 1062 {
223+
bs.stop = true
224+
}
220225
}
221226

222227
stat := &Status{
@@ -254,11 +259,7 @@ func (bs *BinlogSyncer) readBinlog(binlogReader *replication.BinlogStreamer) {
254259
}
255260

256261
if ev.Header.EventType == replication.GTID_EVENT {
257-
gtidEv, ok := ev.Event.(*replication.GTIDEvent)
258-
if !ok {
259-
bs.fileLog.Printf("event is GTIDEvent, but cannot convert to a GTIDEvent")
260-
continue
261-
}
262+
gtidEv, _ := ev.Event.(*replication.GTIDEvent)
262263

263264
u, _ := uuid.FromBytes(gtidEv.SID)
264265
gtidNext = fmt.Sprintf("%s:%d", u.String(), gtidEv.GNO)
@@ -278,10 +279,13 @@ func (bs *BinlogSyncer) readBinlog(binlogReader *replication.BinlogStreamer) {
278279

279280
rowHash, err := hashStringSliceToInt32(indexValues)
280281
if err != nil {
281-
bs.shellLog.Printf("[%s] calculate hash failed: %v", bs.SourceConn.Addr, err)
282+
bs.shellLog.Panicf("[%s] calculate hash failed: %v", bs.SourceConn.Addr, err)
282283
}
283284

284285
chIdx := rowHash % int64(bs.WorkerCnt)
286+
if bs.stop {
287+
break
288+
}
285289
bs.WriteChs[chIdx] <- writeEV
286290
}
287291

@@ -296,29 +300,30 @@ func (bs *BinlogSyncer) collector() {
296300
var errTypes = make(map[string]int)
297301

298302
var start = time.Now()
303+
299304
for outStat := range bs.CountCh {
300305
if outStat.err != nil {
301306
errCnt += 1
302307
errTypes[outStat.err.Error()] += 1
303308
}
309+
304310
rowCnt += 1
305311

306312
if rowCnt%bs.TickCnt == 0 {
307313

308-
rate := float64(bs.TickCnt) / time.Since(start).Seconds()
314+
rowsPerSec := float64(bs.TickCnt) / time.Since(start).Seconds()
309315

310316
om := &OutMessage{
311-
Synced: rowCnt,
312-
Faild: errCnt,
313-
Rate: rate,
314-
Position: outStat.position,
315-
Error: errTypes,
317+
Synced: rowCnt,
318+
Faild: errCnt,
319+
RowsPerSec: rowsPerSec,
320+
Position: outStat.position,
321+
Error: errTypes,
316322
}
317323

318-
outMessage, err := dumpYAML(om)
324+
outMessage, err := marshalYAML(om)
319325
if err != nil {
320326
bs.fileLog.Printf("[%s] dump out message to YAML failed: %v\n", bs.SourceConn.Addr, outMessage)
321-
return
322327
}
323328

324329
outMessage = " ====== [" + bs.SourceConn.Addr + "] status ======\n" + outMessage
@@ -329,7 +334,7 @@ func (bs *BinlogSyncer) collector() {
329334
}
330335
}
331336

332-
func (bs *BinlogSyncer) getEventConnection(row map[string]interface{}) (*DBInfo, error) {
337+
func (bs *BinlogSyncer) getWriteConnection(row map[string]interface{}) (*DBInfo, error) {
333338
shardValues := make([]interface{}, len(bs.TableShard))
334339
for i, k := range bs.TableShard {
335340
shardValues[i] = row[k]
@@ -342,8 +347,11 @@ func (bs *BinlogSyncer) getEventConnection(row map[string]interface{}) (*DBInfo,
342347
if conn == nil {
343348
addr := bs.DBConfig[shard.DBPort]
344349

350+
// DSN(Data Source Name) in go-sql-driver
351+
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s", addr.User, addr.Password, addr.Addr, addr.DBName)
352+
345353
var err error
346-
conn, err = client.Connect(addr.Addr, addr.User, addr.Password, addr.DBName)
354+
conn, err = sql.Open("mysql", dsn)
347355
if err != nil {
348356
bs.shellLog.Printf("[%s] get connection failed: %v\n", bs.SourceConn, err)
349357
return nil, err
@@ -361,25 +369,23 @@ func (bs *BinlogSyncer) getEventConnection(row map[string]interface{}) (*DBInfo,
361369

362370
func (bs *BinlogSyncer) findShards(tbShards []interface{}) *DBShard {
363371

372+
lenShards := len(bs.Shards)
364373
//conf.Shards should be descending
365-
i := sort.Search(len(bs.Shards), func(i int) bool {
374+
i := sort.Search(lenShards, func(i int) bool {
366375
shard := bs.Shards[i].From
367376
rst, err := compareSlice(shard, tbShards)
368377
if err != nil {
369-
bs.shellLog.Printf("[%s] compare table shards failed: %v\n", bs.SourceConn.Addr, err)
378+
bs.shellLog.Panicf("[%s] compare table shards failed: %v\n", bs.SourceConn.Addr, err)
370379
}
371380

372-
if rst <= 0 {
373-
return true
374-
}
375-
return false
381+
return rst <= 0
376382
})
377383

378-
if i >= 0 && i < len(bs.Shards) {
384+
if i >= 0 && i < lenShards {
379385
return &bs.Shards[i]
380386
}
381387

382-
bs.shellLog.Printf("[%s] can not find shard: index out of bound", bs.SourceConn.Addr)
388+
bs.shellLog.Printf("[%s] can not find shard: %v, index out of bound, got index: %d, shards len: %d", bs.SourceConn.Addr, tbShards, i, lenShards)
383389
return nil
384390
}
385391

go/src/app/binlogsync/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func main() {
6464

6565
// read config
6666
confList := &ConfigList{}
67-
err = parseYAML(confName, &confList)
67+
err = unmarshalYAML(confName, &confList)
6868
if err != nil {
6969
shellLog.Panicf("read config file failed: %v\n", err)
7070
}

go/src/app/binlogsync/util.go

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,19 @@ func hashStringSliceToInt32(src []string) (int64, error) {
2626
func compareSlice(a, b []interface{}) (int, error) {
2727
lenA := len(a)
2828
lenB := len(b)
29-
if lenA != lenB {
30-
return 0, errors.New(fmt.Sprintf("length of slices not equals: %d != %d", lenA, lenB))
31-
}
3229

3330
for i, v := range a {
31+
32+
if i >= lenB {
33+
return 1, nil
34+
}
35+
3436
var rst int
37+
3538
switch v.(type) {
36-
case int:
37-
ai := reflect.ValueOf(v).Int()
38-
bi := reflect.ValueOf(b[i]).Int()
39+
case int, int8, int16, int32, int64:
40+
ai, _ := interfaceToInt64(v)
41+
bi, _ := interfaceToInt64(b[i])
3942
rst = compareInt(ai, bi)
4043
case string:
4144
rst = strings.Compare(v.(string), b[i].(string))
@@ -50,6 +53,10 @@ func compareSlice(a, b []interface{}) (int, error) {
5053
return rst, nil
5154
}
5255

56+
if lenA < lenB {
57+
return -1, nil
58+
}
59+
5360
return 0, nil
5461
}
5562

@@ -65,7 +72,24 @@ func compareInt(a, b int64) int {
6572
}
6673
}
6774

68-
func parseYAML(filename string, v interface{}) error {
75+
func interfaceToInt64(v interface{}) (int64, error) {
76+
switch v.(type) {
77+
case int:
78+
return int64(v.(int)), nil
79+
case int8:
80+
return int64(v.(int8)), nil
81+
case int16:
82+
return int64(v.(int16)), nil
83+
case int32:
84+
return int64(v.(int32)), nil
85+
case int64:
86+
return v.(int64), nil
87+
default:
88+
return 0, errors.New(fmt.Sprintf("unknow type of value:%v, type: %T", v, v))
89+
}
90+
}
91+
92+
func unmarshalYAML(filename string, v interface{}) error {
6993
data, err := ioutil.ReadFile(filename)
7094
if err != nil {
7195
return err
@@ -79,7 +103,7 @@ func parseYAML(filename string, v interface{}) error {
79103
return nil
80104
}
81105

82-
func dumpYAML(v interface{}) (string, error) {
106+
func marshalYAML(v interface{}) (string, error) {
83107

84108
data, err := yaml.Marshal(v)
85109
if err != nil {

0 commit comments

Comments
 (0)