Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

以写数据库的方式同步 binlog #7

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

Conversation

wenbobuaa
Copy link

#5 是一个事儿,那个评论有点多。改动挺大,新开了PR方便看。

从一机器读取 mysql binlog 数据向另一数据库执行 sql 语句进行 binlog 同步。

使用 go 实现,依赖 的 go-mysql 包比较大,目前没有在这个目录。

详细描述: #2

var errTypes = make(map[string]int)

var start = time.Now()
for outStat := range bs.CountCh {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的统计没有保证 先读的 event 先被收集到,记录的binlog位置会有问题,需要修改..

event *replication.BinlogEvent
before map[string]interface{}
after map[string]interface{}
dbInfo *DBInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个dbInfo指的是源的还是目标的

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里 是目标的

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

名字上明确下?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯嗯

type OutMessage struct {
Synced int64 `yaml:"Synced"`
Faild int64 `yaml:"Faild"`
Rate float64 `yaml:"Rate(rows/s)"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接用RowPerSec?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

额...rows per sec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

额...需要加复数吗?...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要加吧

lenA := len(a)
lenB := len(b)
if lenA != lenB {
return 0, errors.New(fmt.Sprintf("length of slices not equals: %d != %d", lenA, lenB))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

脱离这个场景, 类似python里, slice长度不同是可以比较的

var rst int
switch v.(type) {
case int:
ai := reflect.ValueOf(v).Int()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为啥不直接v.(int) reflect好像挺慢啊...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里 v.(type) case 到的 int 包括了 int32 int64 等,但是 v.(int) 只能转化 v 是 int 类型的,不能转化 v 是 int64 这种类型的.. 所以才想用 reflect 都会变成 int64

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

那...为啥不分布case int 和int64...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这样 case 太多,还有一点是这样分 case 的话 a[i].(int) ,b[i].(int64) 不在一个 case 还要其他步骤找出来比,已知问题 yaml 把interface 类型的数字读成 int 类型,binlog 里的数字是 int64... 所以用了reflect 看起来整齐点...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reflect可能会影响性能...

case可以同时捕捉int和int64吗? 都转int64咋样...

}
}

func parseYAML(filename string, v interface{}) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要不就符合go风格, marshal unmarshal, 或者顺着py风格, load dump

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯叫 marshal 吧

if rst <= 0 {
return true
}
return false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接return rst <= 0 好不...

}

func (bs *BinlogSyncer) formatRow(srcRow []interface{}) map[string]interface{} {
// the first column `id` should not put in new rowValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥should not...解释下...我觉得应该放进去啊...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

想的是 这个 id 是自增的一列 ,写语句里不加这行让目标表自增这列

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

binlog复制应该完整复制数据. id只有在咱们的场景里是不用的. 其他地方可能是有逻辑依赖的.


bs.mutex.Lock()
_, err = ev.dbInfo.Conn.Execute(sql)
bs.mutex.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为啥加锁?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的Conn 是几个线程公用的,Execute 实现里面有一个 setSequence 的操作会修改 Conn 的值, 测试出错了看到的

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

连接池考虑下吗先生?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

远程调用加锁太慢了吧...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯 考虑一下连接池

whereClause = makeWhereClause(idxField, idxValue)
limitClause = "LIMIT 1"

return fmt.Sprintf("UPDATE %s SET %s%s%s;", tableClause, setClause, whereClause, limitClause)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

恩...update 如果update的是index fields里的内容这里能处理吗...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

能处理的

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

但是.源库的update不一定是根据index来update的吧...可能只根据primary key的id来update...

update语句在apply binlog时应该是必须用1个delete加1个insert才能完整复现update的逻辑.

况且, 考虑binlog重放, 第2次update这个东西可能会失败.

Copy link
Contributor

@drmingdrmer drmingdrmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

还没有记录GTID的逻辑吧?


bs.mutex.Lock()
conn := bs.DBPool[shard.DBPort]
if conn == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是连接池的实现吗? 没看到额

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants