Skip to content

Commit 35983ce

Browse files
lintanghuisiddontang
authored andcommitted
使用sqlparser替换正则进行queryevent的解析 (go-mysql-org#382)
* use sqlparser instead of regexp
1 parent 8804d83 commit 35983ce

File tree

6 files changed

+262
-118
lines changed

6 files changed

+262
-118
lines changed

canal/canal.go

+14-12
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
"github.com/pingcap/errors"
16+
"github.com/pingcap/parser"
1617
"github.com/siddontang/go-log/log"
1718
"github.com/siddontang/go-mysql/client"
1819
"github.com/siddontang/go-mysql/dump"
@@ -28,6 +29,7 @@ type Canal struct {
2829

2930
cfg *Config
3031

32+
parser *parser.Parser
3133
master *masterInfo
3234
dumper *dump.Dumper
3335
dumped bool
@@ -65,7 +67,7 @@ func NewCanal(cfg *Config) (*Canal, error) {
6567

6668
c.dumpDoneCh = make(chan struct{})
6769
c.eventHandler = &DummyEventHandler{}
68-
70+
c.parser = parser.New()
6971
c.tables = make(map[string]*schema.Table)
7072
if c.cfg.DiscardNoMetaRowEvent {
7173
c.errorTablesGetTime = make(map[string]time.Time)
@@ -408,17 +410,17 @@ func (c *Canal) checkBinlogRowFormat() error {
408410

409411
func (c *Canal) prepareSyncer() error {
410412
cfg := replication.BinlogSyncerConfig{
411-
ServerID: c.cfg.ServerID,
412-
Flavor: c.cfg.Flavor,
413-
User: c.cfg.User,
414-
Password: c.cfg.Password,
415-
Charset: c.cfg.Charset,
416-
HeartbeatPeriod: c.cfg.HeartbeatPeriod,
417-
ReadTimeout: c.cfg.ReadTimeout,
418-
UseDecimal: c.cfg.UseDecimal,
419-
ParseTime: c.cfg.ParseTime,
420-
SemiSyncEnabled: c.cfg.SemiSyncEnabled,
421-
MaxReconnectAttempts: c.cfg.MaxReconnectAttempts,
413+
ServerID: c.cfg.ServerID,
414+
Flavor: c.cfg.Flavor,
415+
User: c.cfg.User,
416+
Password: c.cfg.Password,
417+
Charset: c.cfg.Charset,
418+
HeartbeatPeriod: c.cfg.HeartbeatPeriod,
419+
ReadTimeout: c.cfg.ReadTimeout,
420+
UseDecimal: c.cfg.UseDecimal,
421+
ParseTime: c.cfg.ParseTime,
422+
SemiSyncEnabled: c.cfg.SemiSyncEnabled,
423+
MaxReconnectAttempts: c.cfg.MaxReconnectAttempts,
422424
TimestampStringLocation: c.cfg.TimestampStringLocation,
423425
}
424426

canal/canal_test.go

+101-61
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package canal
22

33
import (
4-
"bytes"
54
"flag"
65
"fmt"
76
"testing"
87
"time"
98

109
. "github.com/pingcap/check"
1110
"github.com/pingcap/errors"
11+
"github.com/pingcap/parser"
1212
"github.com/siddontang/go-log/log"
1313
"github.com/siddontang/go-mysql/mysql"
1414
)
@@ -140,98 +140,138 @@ func (s *canalTestSuite) TestCanalFilter(c *C) {
140140

141141
func TestCreateTableExp(t *testing.T) {
142142
cases := []string{
143-
"CREATE TABLE `mydb.mytable` (`id` int(10)) ENGINE=InnoDB",
144-
"CREATE TABLE `mytable` (`id` int(10)) ENGINE=InnoDB",
145-
"CREATE TABLE IF NOT EXISTS `mytable` (`id` int(10)) ENGINE=InnoDB",
146-
"CREATE TABLE IF NOT EXISTS mytable (`id` int(10)) ENGINE=InnoDB",
143+
"CREATE TABLE /*generated by server */ mydb.mytable (`id` int(10)) ENGINE=InnoDB",
144+
"CREATE TABLE `mydb`.`mytable` (`id` int(10)) ENGINE=InnoDB",
145+
"CREATE TABLE IF NOT EXISTS mydb.`mytable` (`id` int(10)) ENGINE=InnoDB",
146+
"CREATE TABLE IF NOT EXISTS `mydb`.mytable (`id` int(10)) ENGINE=InnoDB",
147147
}
148-
table := []byte("mytable")
149-
db := []byte("mydb")
148+
table := "mytable"
149+
db := "mydb"
150+
pr := parser.New()
150151
for _, s := range cases {
151-
m := expCreateTable.FindSubmatch([]byte(s))
152-
mLen := len(m)
153-
if m == nil || !bytes.Equal(m[mLen-1], table) || (len(m[mLen-2]) > 0 && !bytes.Equal(m[mLen-2], db)) {
154-
t.Fatalf("TestCreateTableExp: case %s failed\n", s)
152+
stmts, _, err := pr.Parse(s, "", "")
153+
if err != nil {
154+
t.Fatalf("TestCreateTableExp:case %s failed\n", s)
155+
}
156+
for _, st := range stmts {
157+
nodes := parseStmt(st)
158+
if len(nodes) == 0 {
159+
continue
160+
}
161+
if nodes[0].db != db || nodes[0].table != table {
162+
t.Fatalf("TestCreateTableExp:case %s failed\n", s)
163+
}
155164
}
156165
}
157166
}
158-
159167
func TestAlterTableExp(t *testing.T) {
160168
cases := []string{
161-
"ALTER TABLE `mydb`.`mytable` ADD `field2` DATE NULL AFTER `field1`;",
169+
"ALTER TABLE /*generated by server*/ `mydb`.`mytable` ADD `field2` DATE NULL AFTER `field1`;",
162170
"ALTER TABLE `mytable` ADD `field2` DATE NULL AFTER `field1`;",
163171
"ALTER TABLE mydb.mytable ADD `field2` DATE NULL AFTER `field1`;",
164172
"ALTER TABLE mytable ADD `field2` DATE NULL AFTER `field1`;",
165173
"ALTER TABLE mydb.mytable ADD field2 DATE NULL AFTER `field1`;",
166174
}
167175

168-
table := []byte("mytable")
169-
db := []byte("mydb")
176+
table := "mytable"
177+
db := "mydb"
178+
pr := parser.New()
170179
for _, s := range cases {
171-
m := expAlterTable.FindSubmatch([]byte(s))
172-
mLen := len(m)
173-
if m == nil || !bytes.Equal(m[mLen-1], table) || (len(m[mLen-2]) > 0 && !bytes.Equal(m[mLen-2], db)) {
174-
t.Fatalf("TestAlterTableExp: case %s failed\n", s)
180+
stmts, _, err := pr.Parse(s, "", "")
181+
if err != nil {
182+
t.Fatalf("TestAlterTableExp:case %s failed\n", s)
183+
}
184+
for _, st := range stmts {
185+
nodes := parseStmt(st)
186+
if len(nodes) == 0 {
187+
continue
188+
}
189+
rdb := nodes[0].db
190+
rtable := nodes[0].table
191+
if (len(rdb) > 0 && rdb != db) || rtable != table {
192+
t.Fatalf("TestAlterTableExp:case %s failed db %s,table %s\n", s, rdb, rtable)
193+
}
175194
}
176195
}
177196
}
178197

179198
func TestRenameTableExp(t *testing.T) {
180199
cases := []string{
181-
"rename table `mydb`.`mytable` to `mydb`.`mytable1`",
182-
"rename table `mytable` to `mytable1`",
183-
"rename table mydb.mytable to mydb.mytable1",
184-
"rename table mytable to mytable1",
185-
186-
"rename table `mydb`.`mytable` to `mydb`.`mytable2`, `mydb`.`mytable3` to `mydb`.`mytable1`",
187-
"rename table `mytable` to `mytable2`, `mytable3` to `mytable1`",
188-
"rename table mydb.mytable to mydb.mytable2, mydb.mytable3 to mydb.mytable1",
189-
"rename table mytable to mytable2, mytable3 to mytable1",
200+
"rename /* generate by server */table `mydb`.`mytable0` to `mydb`.`mytable0tmp`",
201+
"rename table `mytable0` to `mytable0tmp`",
202+
"rename table mydb.mytable0 to mydb.mytable0tmp",
203+
"rename table mytable0 to mytable0tmp",
204+
205+
"rename table `mydb`.`mytable0` to `mydb`.`mytable0tmp`, `mydb`.`mytable1` to `mydb`.`mytable1tmp`",
206+
"rename table `mytable0` to `mytable0tmp`, `mytable1` to `mytable1tmp`",
207+
"rename table mydb.mytable0 to mydb.mytable0tmp, mydb.mytable1 to mydb.mytable1tmp",
208+
"rename table mytable0 to mytable0tmp, mytable1 to mytabletmp",
190209
}
191-
table := []byte("mytable")
192-
db := []byte("mydb")
210+
baseTable := "mytable"
211+
db := "mydb"
212+
pr := parser.New()
193213
for _, s := range cases {
194-
m := expRenameTable.FindSubmatch([]byte(s))
195-
mLen := len(m)
196-
if m == nil || !bytes.Equal(m[mLen-1], table) || (len(m[mLen-2]) > 0 && !bytes.Equal(m[mLen-2], db)) {
197-
t.Fatalf("TestRenameTableExp: case %s failed\n", s)
214+
stmts, _, err := pr.Parse(s, "", "")
215+
if err != nil {
216+
t.Fatalf("TestRenameTableExp:case %s failed\n", s)
217+
}
218+
for _, st := range stmts {
219+
nodes := parseStmt(st)
220+
if len(nodes) == 0 {
221+
continue
222+
}
223+
for i, node := range nodes {
224+
rdb := node.db
225+
rtable := node.table
226+
table := fmt.Sprintf("%s%d", baseTable, i)
227+
if (len(rdb) > 0 && rdb != db) || rtable != table {
228+
t.Fatalf("TestRenameTableExp:case %s failed db %s,table %s\n", s, rdb, rtable)
229+
}
230+
}
198231
}
199232
}
200233
}
201234

202235
func TestDropTableExp(t *testing.T) {
203236
cases := []string{
204-
"drop table test1",
205-
"DROP TABLE test1",
206-
"DROP TABLE test1",
207-
"DROP table IF EXISTS test.test1",
208-
"drop table `test1`",
209-
"DROP TABLE `test1`",
210-
"DROP table IF EXISTS `test`.`test1`",
211-
"DROP TABLE `test1` /* generated by server */",
212-
"DROP table if exists test1",
213-
"DROP table if exists `test1`",
214-
"DROP table if exists test.test1",
215-
"DROP table if exists `test`.test1",
216-
"DROP table if exists `test`.`test1`",
217-
"DROP table if exists test.`test1`",
218-
"DROP table if exists test.`test1`",
237+
"drop table test0",
238+
"DROP TABLE test0",
239+
"DROP TABLE test0",
240+
"DROP table IF EXISTS test.test0",
241+
"drop table `test0`",
242+
"DROP TABLE `test0`",
243+
"DROP table IF EXISTS `test`.`test0`",
244+
"DROP TABLE `test0` /* generated by server */",
245+
"DROP /*generated by server */ table if exists test0",
246+
"DROP table if exists `test0`",
247+
"DROP table if exists test.test0",
248+
"DROP table if exists `test`.test0",
249+
"DROP table if exists `test`.`test0`",
250+
"DROP table if exists test.`test0`",
251+
"DROP table if exists test.`test0`",
219252
}
220253

221-
table := []byte("test1")
254+
baseTable := "test"
255+
db := "test"
256+
pr := parser.New()
222257
for _, s := range cases {
223-
m := expDropTable.FindSubmatch([]byte(s))
224-
mLen := len(m)
225-
if m == nil {
226-
t.Fatalf("TestDropTableExp: case %s failed\n", s)
227-
return
228-
}
229-
if mLen < 4 {
230-
t.Fatalf("TestDropTableExp: case %s failed\n", s)
231-
return
258+
stmts, _, err := pr.Parse(s, "", "")
259+
if err != nil {
260+
t.Fatalf("TestDropTableExp:case %s failed\n", s)
232261
}
233-
if !bytes.Equal(m[mLen-1], table) {
234-
t.Fatalf("TestDropTableExp: case %s failed\n", s)
262+
for _, st := range stmts {
263+
nodes := parseStmt(st)
264+
if len(nodes) == 0 {
265+
continue
266+
}
267+
for i, node := range nodes {
268+
rdb := node.db
269+
rtable := node.table
270+
table := fmt.Sprintf("%s%d", baseTable, i)
271+
if (len(rdb) > 0 && rdb != db) || rtable != table {
272+
t.Fatalf("TestDropTableExp:case %s failed db %s,table %s\n", s, rdb, rtable)
273+
}
274+
}
235275
}
236276
}
237277
}

canal/expr.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package canal
2+
3+
import (
4+
"io"
5+
6+
"github.com/pingcap/parser/ast"
7+
"github.com/pingcap/parser/format"
8+
)
9+
10+
func init() {
11+
ast.NewValueExpr = newValueExpr
12+
ast.NewParamMarkerExpr = newParamExpr
13+
ast.NewDecimal = func(_ string) (interface{}, error) {
14+
return nil, nil
15+
}
16+
ast.NewHexLiteral = func(_ string) (interface{}, error) {
17+
return nil, nil
18+
}
19+
ast.NewBitLiteral = func(_ string) (interface{}, error) {
20+
return nil, nil
21+
}
22+
}
23+
24+
type paramExpr struct {
25+
valueExpr
26+
}
27+
28+
func newParamExpr(_ int) ast.ParamMarkerExpr {
29+
return &paramExpr{}
30+
}
31+
func (pe *paramExpr) SetOrder(o int) {}
32+
33+
type valueExpr struct {
34+
ast.TexprNode
35+
}
36+
37+
func newValueExpr(_ interface{}) ast.ValueExpr { return &valueExpr{} }
38+
func (ve *valueExpr) SetValue(val interface{}) {}
39+
func (ve *valueExpr) GetValue() interface{} { return nil }
40+
func (ve *valueExpr) GetDatumString() string { return "" }
41+
func (ve *valueExpr) GetString() string { return "" }
42+
func (ve *valueExpr) GetProjectionOffset() int { return 0 }
43+
func (ve *valueExpr) SetProjectionOffset(offset int) {}
44+
func (ve *valueExpr) Restore(ctx *format.RestoreCtx) error { return nil }
45+
func (ve *valueExpr) Accept(v ast.Visitor) (node ast.Node, ok bool) { return }
46+
func (ve *valueExpr) Text() string { return "" }
47+
func (ve *valueExpr) SetText(text string) {}
48+
func (ve *valueExpr) Format(w io.Writer) {}

0 commit comments

Comments
 (0)