diff --git a/go/src/app/binlogsync/binlog_sync.go b/go/src/app/binlogsync/binlog_sync.go new file mode 100644 index 0000000..0e5fe65 --- /dev/null +++ b/go/src/app/binlogsync/binlog_sync.go @@ -0,0 +1,430 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "log" + "reflect" + "sort" + "sync" + "time" + + "github.com/go-sql-driver/mysql" + uuid "github.com/satori/go.uuid" + "github.com/siddontang/go-mysql/replication" +) + +type Connection struct { + Addr string `yaml:"Addr"` // can be ip:port or a unix socket domain + Host string `yaml:"Host"` + Port string `yaml:"Port"` + User string `yaml:"User"` + Password string `yaml:"Password"` + DBName string `yaml:"DBName"` +} + +type DBInfo struct { + Conn *sql.DB + Shard *DBShard +} + +type DBShard struct { + From []interface{} `yaml:"From"` + Table string `yaml:"Table"` + DBPort string `yaml:"DBPort"` +} + +type WriteEvent struct { + event *replication.BinlogEvent + before map[string]interface{} + after map[string]interface{} + nextGTID string +} + +type BinlogSyncer struct { + Config + + DBPool map[string]*sql.DB + WriteChs []chan *WriteEvent + CountCh chan *Result + + mutex *sync.Mutex + wg *sync.WaitGroup + shellLog *log.Logger + fileLog *log.Logger + + stat *Status + + stop bool +} + +type BinlogPosition struct { + BinlogFile string `yaml:"BinlogFile"` + BinlogPos int64 `yaml:"BinlogPos"` + NextGTID string `yaml:"GTID"` +} + +type Status struct { + Finished int64 `yaml:"Finished"` + Failed int64 `yaml:"Failed"` + Errors map[string]int `yaml:"Errors"` + RowsPerSec float64 `yaml:"RowsPerSec"` +} + +type Result struct { + err error + goroutineIndex int + + gtid string + + position BinlogPosition +} + +func NewBinlogSyncer(conf *Config) *BinlogSyncer { + fileLog := log.New(logFile, "", log.Ldate|log.Ltime|log.Lshortfile) + fileLog.SetPrefix("[" + conf.SourceConn.Addr + "] ") + + bs := &BinlogSyncer{ + Config: *conf, + + DBPool: make(map[string]*sql.DB), + WriteChs: make([]chan *WriteEvent, conf.WorkerCnt), + CountCh: make(chan *Result, channelCapacity*conf.WorkerCnt), + + mutex: &sync.Mutex{}, + wg: &sync.WaitGroup{}, + shellLog: shellLog, + fileLog: fileLog, + stat: &Status{ + Errors: make(map[string]int), + }, + stop: false, + } + + return bs +} + +func (bs *BinlogSyncer) Sync() { + + var binlogReader *replication.BinlogStreamer + var err error + + if bs.GTIDSet != "" { + binlogReader, err = newBinlogReaderByGTID(&bs.SourceConn, bs.GTIDSet, 9999) + } else { + binlogReader, err = newBinlogReaderByPosition(&bs.SourceConn, bs.BinlogFile, bs.BinlogPos, 9999) + } + + if err != nil { + bs.shellLog.Printf("[%s] make binlog reader failed: %v\n", bs.SourceConn.Addr, err) + return + } + + for i := 0; i < bs.WorkerCnt; i++ { + writeCh := make(chan *WriteEvent, channelCapacity) + bs.WriteChs[i] = writeCh + bs.wg.Add(1) + go bs.writeToDB(i, writeCh) + } + + go bs.readBinlog(binlogReader) + + go func() { + bs.wg.Wait() + close(bs.CountCh) + }() + + bs.collector() +} + +func (bs *BinlogSyncer) formatRow(srcRow []interface{}) map[string]interface{} { + rowValue := make([]interface{}, len(srcRow)) + for i, v := range srcRow { + if v == nil { + continue + } + + var tmp interface{} + if reflect.TypeOf(v).String() == "[]uint8" { + // convert []byte to string + tmp = fmt.Sprintf("%s", v) + } else { + tmp = v + } + + rowValue[i] = tmp + } + + row := make(map[string]interface{}) + + for i, k := range bs.TableField { + row[k] = rowValue[i] + } + + return row +} + +func (bs *BinlogSyncer) writeToDB(chIdx int, inCh chan *WriteEvent) { + defer bs.wg.Done() + + for ev := range inCh { + if ev.event.Header.EventType == replication.ROTATE_EVENT { + rotateEv, _ := ev.event.Event.(*replication.RotateEvent) + + rst := &Result{ + goroutineIndex: chIdx, + position: BinlogPosition{ + BinlogPos: int64(rotateEv.Position), + BinlogFile: string(rotateEv.NextLogName), + }, + } + bs.CountCh <- rst + continue + } + + var dbInfo *DBInfo + var err error + dbInfo, err = bs.getWriteConnection(ev.after) + if err != nil { + bs.shellLog.Printf("[%s] get connection failed: %v\n", bs.SourceConn.Addr, err) + bs.stop = true + return + } + + index := bs.makeTableIndex(ev) + + value := make([]string, len(bs.TableField)) + for i, k := range bs.TableField { + value[i] = fmt.Sprintf("%v", ev.after[k]) + } + + var sql string + evType := ev.event.Header.EventType + + bs.fileLog.Printf("routin index: %d, before: %v, after: %v, event type: %v\n", chIdx, ev.before, ev.after, evType) + + if evType == replication.UPDATE_ROWS_EVENTv2 || evType == replication.UPDATE_ROWS_EVENTv1 || evType == replication.UPDATE_ROWS_EVENTv0 { + sql = makeUpdateSql(dbInfo.Shard.Table, bs.TableField, bs.TableField, value, value) + + } else if evType == replication.DELETE_ROWS_EVENTv2 || evType == replication.DELETE_ROWS_EVENTv1 || evType == replication.DELETE_ROWS_EVENTv0 { + + sql = makeDeleteSql(dbInfo.Shard.Table, bs.TableIndex, index) + } else if evType == replication.WRITE_ROWS_EVENTv2 || evType == replication.WRITE_ROWS_EVENTv1 || evType == replication.WRITE_ROWS_EVENTv0 { + + sql = makeInsertSql(dbInfo.Shard.Table, bs.TableField, value) + } else { + continue + } + + bs.fileLog.Printf("routin index: %d, get sql statement: %v", chIdx, sql) + + _, err = dbInfo.Conn.Exec(sql) + if err != nil { + bs.fileLog.Printf("Execute error: %v\n", err) + + sqlErr := err.(*mysql.MySQLError) + if sqlErr.Number != 1062 { + bs.stop = true + } + } + + rst := &Result{ + err: err, + goroutineIndex: chIdx, + position: BinlogPosition{ + BinlogPos: int64(ev.event.Header.LogPos), + BinlogFile: bs.BinlogFile, + NextGTID: ev.nextGTID, + }, + } + + bs.fileLog.Printf("routin index: %d, event position: %d\n", chIdx, ev.event.Header.LogPos) + bs.CountCh <- rst + } +} + +func (bs *BinlogSyncer) readBinlog(binlogReader *replication.BinlogStreamer) { + var gtidNext string + for { + ctx, _ := context.WithTimeout(context.Background(), 30*time.Second) + ev, err := binlogReader.GetEvent(ctx) + if err != nil { + bs.fileLog.Printf("get event failed: %v\n", err) + break + } + + if ev.Header.EventType == replication.ROTATE_EVENT { + writeEV := &WriteEvent{ + event: ev, + } + // RotateEvent has no row to hash so put it to channal 0 + bs.WriteChs[0] <- writeEV + continue + } + + if ev.Header.EventType == replication.GTID_EVENT { + gtidEv, _ := ev.Event.(*replication.GTIDEvent) + + u, _ := uuid.FromBytes(gtidEv.SID) + gtidNext = fmt.Sprintf("%s:%d", u.String(), gtidEv.GNO) + + bs.fileLog.Printf("next gtid: %s\n", gtidNext) + continue + } + + writeEV := bs.makeWriteEvent(ev) + if writeEV == nil { + // not a RowsEvent + continue + } + writeEV.nextGTID = gtidNext + + indexValues := bs.makeTableIndex(writeEV) + + rowHash, err := hashStringSliceToInt32(indexValues) + if err != nil { + bs.shellLog.Panicf("[%s] calculate hash failed: %v", bs.SourceConn.Addr, err) + } + + chIdx := rowHash % int64(bs.WorkerCnt) + if bs.stop { + break + } + bs.WriteChs[chIdx] <- writeEV + } + + for _, ch := range bs.WriteChs { + close(ch) + } +} + +func (bs *BinlogSyncer) collector() { + + var start = time.Now() + var tickCnt = int64(0) + + ticker := time.NewTicker(time.Duration(time.Minute * 1)) + defer ticker.Stop() + + go func() { + for _ = range ticker.C { + bs.stat.RowsPerSec = float64(tickCnt) / time.Since(start).Seconds() + tickCnt = 0 + start = time.Now() + } + }() + + for rst := range bs.CountCh { + bs.stat.Finished += 1 + tickCnt += 1 + + if rst.err != nil { + bs.stat.Errors[rst.err.Error()] += 1 + bs.stat.Failed += 1 + } + } +} + +func (bs *BinlogSyncer) GetStat() *Status { + return bs.stat +} + +func (bs *BinlogSyncer) getWriteConnection(row map[string]interface{}) (*DBInfo, error) { + shardValues := make([]interface{}, len(bs.TableShard)) + for i, k := range bs.TableShard { + shardValues[i] = row[k] + } + + shard := bs.findShards(shardValues) + + bs.mutex.Lock() + conn := bs.DBPool[shard.DBPort] + if conn == nil { + addr := bs.DBConfig[shard.DBPort] + + // DSN(Data Source Name) in go-sql-driver + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s", addr.User, addr.Password, addr.Addr, addr.DBName) + + var err error + conn, err = sql.Open("mysql", dsn) + if err != nil { + bs.shellLog.Printf("[%s] get connection failed: %v\n", bs.SourceConn, err) + return nil, err + } + bs.DBPool[addr.Port] = conn + } + bs.mutex.Unlock() + + dbInfo := &DBInfo{ + Conn: conn, + Shard: shard, + } + return dbInfo, nil +} + +func (bs *BinlogSyncer) findShards(tbShards []interface{}) *DBShard { + + lenShards := len(bs.Shards) + //conf.Shards should be descending + i := sort.Search(lenShards, func(i int) bool { + shard := bs.Shards[i].From + rst, err := compareSlice(shard, tbShards) + if err != nil { + bs.shellLog.Panicf("[%s] compare table shards failed: %v\n", bs.SourceConn.Addr, err) + } + + return rst <= 0 + }) + + if i >= 0 && i < lenShards { + return &bs.Shards[i] + } + + bs.shellLog.Printf("[%s] can not find shard: %v, index out of bound, got index: %d, shards len: %d", bs.SourceConn.Addr, tbShards, i, lenShards) + return nil +} + +func (bs *BinlogSyncer) makeWriteEvent(ev *replication.BinlogEvent) *WriteEvent { + var before map[string]interface{} + var after map[string]interface{} + + rowEv, ok := ev.Event.(*replication.RowsEvent) + if !ok { + bs.fileLog.Printf("event is not a rows event, got: %v\n", ev.Header.EventType) + return nil + } + + table := string(rowEv.Table.Table) + if table != bs.TableName { + bs.fileLog.Printf("rows event is not the required table, get %v\n", table) + return nil + } + + if len(rowEv.Rows) == 2 { + before = bs.formatRow(rowEv.Rows[0]) + after = bs.formatRow(rowEv.Rows[1]) + } else { + before = nil + after = bs.formatRow(rowEv.Rows[0]) + } + + return &WriteEvent{ + event: ev, + before: before, + after: after, + } +} + +func (bs *BinlogSyncer) makeTableIndex(ev *WriteEvent) []string { + index := make([]string, len(bs.TableIndex)) + for i, k := range bs.TableIndex { + if ev.before != nil { + index[i] = fmt.Sprintf("%v", ev.before[k]) + } else { + index[i] = fmt.Sprintf("%v", ev.after[k]) + } + } + + return index +} diff --git a/go/src/app/binlogsync/config.yaml b/go/src/app/binlogsync/config.yaml new file mode 100644 index 0000000..09f3234 --- /dev/null +++ b/go/src/app/binlogsync/config.yaml @@ -0,0 +1,69 @@ +worker1: + WriteWorkerCnt: 2 + + SourceConn: + Addr: 127.0.0.1:3333 + Host: 127.0.0.1 + Port: "3333" + User: admin + Password: password + DBName: mysql + + DBConfig: + 4444: + Addr: 127.0.0.2:4444 + Host: 127.0.0.2 + Port: "4444" + User: admin + Password: password + DBName: mysql + + Shards: + - + From: [field0, field1, field2, ...] + Table: destination table + DBPort: "4444" + + TableName: sorce table + TableField: [] + TableShard: [] + TableIndex: [] + + GTIDSet: string + BinlogFile: mysql-bin.000001 + BinlogPos: 4 + +worker2: + WriteWorkerCnt: 2 + + SourceConn: + Addr: 127.0.0.1:5555 + Host: 127.0.0.1 + Port: "5555" + User: admin + Password: password + DBName: mysql + + DBConfig: + 6666: + Addr: 127.0.0.2:6666 + Host: 127.0.0.2 + Port: "6666" + User: admin + Password: password + DBName: mysql + + Shards: + - + From: [field0, field1, field2, ...] + Table: destination table + DBPort: "6666" + + TableName: source table + TableField: [] + TableShard: [] + TableIndex: [] + + GTIDSet: string + BinlogFile: mysql-bin.000001 + BinlogPos: 4 diff --git a/go/src/app/binlogsync/main.go b/go/src/app/binlogsync/main.go new file mode 100644 index 0000000..a228017 --- /dev/null +++ b/go/src/app/binlogsync/main.go @@ -0,0 +1,84 @@ +package main + +import ( + "flag" + "fmt" + "log" + "os" + "sync" +) + +var ( + logFileName = "binlog_sync.out" + confName = "./config.json" + channelCapacity = 10240 + + logFile *os.File + shellLog *log.Logger + + mainWG *sync.WaitGroup +) + +type Config struct { + + // WorkerCnt specifies how many goroutine used to execute sql statement + WorkerCnt int `yaml:"WriteWorkerCnt"` + + SourceConn Connection `yaml:"SourceConn"` + + DBConfig map[string]Connection `yaml:"DBConfig"` + Shards []DBShard `yaml:"Shards"` + + TableName string `yaml:"TableName"` + TableField []string `yaml:"TableField"` + TableShard []string `yaml:"TableShard"` + TableIndex []string `yaml:"TableIndex"` + + // if specifies GTIDSet, binlog file and pos will be ignored + GTIDSet string `yaml:"GTIDSet"` + + BinlogFile string `yaml:"BinlogFile"` + BinlogPos int32 `yaml:"BinlogPos"` +} + +func main() { + + // set log + var err error + logFile, err = os.Create(logFileName) + if err != nil { + panic(fmt.Sprintf("create log file failed: %v\n", err)) + } + defer logFile.Close() + + shellLog = log.New(os.Stdout, "", 0) + + // read config + var confList = make(map[string]*Config) + err = unmarshalYAML(confName, confList) + if err != nil { + fmt.Printf("read config file failed: %v\n", err) + return + } + + controller := NewController() + + for w, conf := range confList { + syncer := NewBinlogSyncer(conf) + controller.AddWorker(&Worker{ + name: w, + syncer: syncer, + config: conf, + }) + go syncer.Sync() + } + + controller.Listen("8888") +} + +func init() { + flag.StringVar(&logFileName, "log", logFileName, "file name to output error log") + flag.StringVar(&confName, "config", confName, "configration file path") + + flag.Parse() +} diff --git a/go/src/app/binlogsync/sqlUtil.go b/go/src/app/binlogsync/sqlUtil.go new file mode 100644 index 0000000..1c6612a --- /dev/null +++ b/go/src/app/binlogsync/sqlUtil.go @@ -0,0 +1,131 @@ +package main + +import ( + "fmt" + "strconv" + "strings" + + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" +) + +func makeUpdateSql(table string, idxField, tbField, idxValue, tbValue []string) string { + var setClause string + var whereClause string + var limitClause string + var tableClause string + + tableClause = quote(table) + setClause = makeSqlCondition(tbField, tbValue, "=", ", ") + whereClause = makeWhereClause(idxField, idxValue) + limitClause = "LIMIT 1" + + return fmt.Sprintf("UPDATE %s SET %s%s%s;", tableClause, setClause, whereClause, limitClause) +} + +func makeInsertSql(table string, tbField, tbValue []string) string { + var fldClause string + var valClause string + var tableClause string + + tableClause = quote(table) + + fld := make([]string, len(tbField)) + val := make([]string, len(tbValue)) + + for i := 0; i < len(tbField); i++ { + fld[i] = quote(tbField[i]) + val[i] = "\"" + mysql.Escape(tbValue[i]) + "\"" + } + + fldClause = "(" + strings.Join(fld, ", ") + ")" + valClause = "(" + strings.Join(val, ", ") + ")" + + return fmt.Sprintf("INSERT INTO %s %s VALUES %s;", tableClause, fldClause, valClause) +} + +func makeDeleteSql(table string, idxField, idxValue []string) string { + var whereClause string + var limitClause string + var tableClause string + + tableClause = quote(table) + + whereClause = makeWhereClause(idxField, idxValue) + limitClause = "LIMIT 1" + + return fmt.Sprintf("DELETE FROM %s %s %s;", tableClause, whereClause, limitClause) +} + +func makeWhereClause(fields, values []string) string { + return fmt.Sprintf("WHERE %s", makeSqlCondition(fields, values, "=", " AND ")) +} + +func makeSqlCondition(fields, values []string, operator, formatter string) string { + conds := make([]string, len(fields)) + + for i, k := range fields { + conds[i] = fmt.Sprintf("%s%s%s", quote(k), operator, "\""+mysql.Escape(values[i])+"\"") + } + + return strings.Join(conds, formatter) +} + +func quote(src string) string { + + rst := strings.Replace(src, "`", "\\`", -1) + + return "`" + rst + "`" +} + +func newBinlogReader(conn *Connection, serverID int32) (*replication.BinlogSyncer, error) { + port, err := strconv.ParseInt(conn.Port, 10, 16) + if err != nil { + return nil, err + } + + binlogCfg := replication.BinlogSyncerConfig{ + ServerID: uint32(serverID), + Flavor: "mysql", + Host: conn.Host, + Port: uint16(port), + User: conn.User, + Password: conn.Password, + } + + return replication.NewBinlogSyncer(binlogCfg), nil +} + +func newBinlogReaderByPosition(conn *Connection, binlogFile string, binlogPos int32, serverID int32) (*replication.BinlogStreamer, error) { + + reader, err := newBinlogReader(conn, serverID) + if err != nil { + return nil, err + } + + streamer, err := reader.StartSync(mysql.Position{binlogFile, uint32(binlogPos)}) + if err != nil { + return nil, err + } + + return streamer, nil +} + +func newBinlogReaderByGTID(conn *Connection, GTID string, serverID int32) (*replication.BinlogStreamer, error) { + reader, err := newBinlogReader(conn, serverID) + if err != nil { + return nil, err + } + + gtidSet, err := mysql.ParseMysqlGTIDSet(GTID) + if err != nil { + return nil, err + } + + streamer, err := reader.StartSyncGTID(gtidSet) + if err != nil { + return nil, err + } + + return streamer, nil +} diff --git a/go/src/app/binlogsync/util.go b/go/src/app/binlogsync/util.go new file mode 100644 index 0000000..8404437 --- /dev/null +++ b/go/src/app/binlogsync/util.go @@ -0,0 +1,114 @@ +package main + +import ( + "errors" + "fmt" + "hash/fnv" + "io/ioutil" + "reflect" + "strings" + + yaml "gopkg.in/yaml.v2" +) + +func hashStringSliceToInt32(src []string) (int64, error) { + h := fnv.New32() + for _, s := range src { + _, err := h.Write([]byte(s)) + if err != nil { + return 0, err + } + } + + return int64(h.Sum32()), nil +} + +func compareSlice(a, b []interface{}) (int, error) { + lenA := len(a) + lenB := len(b) + + for i, v := range a { + + if i >= lenB { + return 1, nil + } + + var rst int + + switch v.(type) { + case int, int8, int16, int32, int64: + ai, _ := interfaceToInt64(v) + bi, _ := interfaceToInt64(b[i]) + rst = compareInt(ai, bi) + case string: + rst = strings.Compare(v.(string), b[i].(string)) + default: + return 0, errors.New(fmt.Sprintf("unknow type of element: %v", reflect.TypeOf(v))) + } + + if rst == 0 { + continue + } + + return rst, nil + } + + if lenA < lenB { + return -1, nil + } + + return 0, nil +} + +func compareInt(a, b int64) int { + if a == b { + return 0 + } + + if a > b { + return 1 + } else { + return -1 + } +} + +func interfaceToInt64(v interface{}) (int64, error) { + switch v.(type) { + case int: + return int64(v.(int)), nil + case int8: + return int64(v.(int8)), nil + case int16: + return int64(v.(int16)), nil + case int32: + return int64(v.(int32)), nil + case int64: + return v.(int64), nil + default: + return 0, errors.New(fmt.Sprintf("unknow type of value:%v, type: %T", v, v)) + } +} + +func unmarshalYAML(filename string, v interface{}) error { + data, err := ioutil.ReadFile(filename) + if err != nil { + return err + } + + err = yaml.Unmarshal(data, v) + if err != nil { + return err + } + + return nil +} + +func marshalYAML(v interface{}) (string, error) { + + data, err := yaml.Marshal(v) + if err != nil { + return "", err + } + + return string(data), nil +}