Skip to content

Commit a8c16ae

Browse files
authored
Merge pull request #482 from huangjunwen/master
2 parents 8250ec4 + 6b953b6 commit a8c16ae

File tree

4 files changed

+516
-1
lines changed

4 files changed

+516
-1
lines changed

replication/binlogsyncer.go

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
148148

149149
b.cfg = cfg
150150
b.parser = NewBinlogParser()
151+
b.parser.SetFlavor(cfg.Flavor)
151152
b.parser.SetRawMode(b.cfg.RawModeEnabled)
152153
b.parser.SetParseTime(b.cfg.ParseTime)
153154
b.parser.SetTimestampStringLocation(b.cfg.TimestampStringLocation)

replication/parser.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ var (
1919
)
2020

2121
type BinlogParser struct {
22+
// "mysql" or "mariadb", if not set, use "mysql" by default
23+
flavor string
24+
2225
format *FormatDescriptionEvent
2326

2427
tables map[uint64]*TableMapEvent
@@ -200,6 +203,10 @@ func (p *BinlogParser) SetVerifyChecksum(verify bool) {
200203
p.verifyChecksum = verify
201204
}
202205

206+
func (p *BinlogParser) SetFlavor(flavor string) {
207+
p.flavor = flavor
208+
}
209+
203210
func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
204211
h := new(EventHeader)
205212
err := h.Decode(data)
@@ -234,7 +241,9 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
234241
case XID_EVENT:
235242
e = &XIDEvent{}
236243
case TABLE_MAP_EVENT:
237-
te := &TableMapEvent{}
244+
te := &TableMapEvent{
245+
flavor: p.flavor,
246+
}
238247
if p.format.EventTypeHeaderLengths[TABLE_MAP_EVENT-1] == 6 {
239248
te.tableIDSize = 4
240249
} else {

replication/row_event.go

+324
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
var errMissingTableMapEvent = errors.New("invalid table id, no corresponding table map event")
2020

2121
type TableMapEvent struct {
22+
flavor string
2223
tableIDSize int
2324

2425
TableID uint64
@@ -423,6 +424,116 @@ func (e *TableMapEvent) Dump(w io.Writer) {
423424
fmt.Fprintf(w, "Enum/set default charset: %v\n", e.EnumSetDefaultCharset)
424425
fmt.Fprintf(w, "Enum/set column charset: %v\n", e.EnumSetColumnCharset)
425426

427+
unsignedMap := e.UnsignedMap()
428+
fmt.Fprintf(w, "UnsignedMap: %#v\n", unsignedMap)
429+
430+
collationMap := e.CollationMap()
431+
fmt.Fprintf(w, "CollationMap: %#v\n", collationMap)
432+
433+
enumSetCollationMap := e.EnumSetCollationMap()
434+
fmt.Fprintf(w, "EnumSetCollationMap: %#v\n", enumSetCollationMap)
435+
436+
enumStrValueMap := e.EnumStrValueMap()
437+
fmt.Fprintf(w, "EnumStrValueMap: %#v\n", enumStrValueMap)
438+
439+
setStrValueMap := e.SetStrValueMap()
440+
fmt.Fprintf(w, "SetStrValueMap: %#v\n", setStrValueMap)
441+
442+
geometryTypeMap := e.GeometryTypeMap()
443+
fmt.Fprintf(w, "GeometryTypeMap: %#v\n", geometryTypeMap)
444+
445+
nameMaxLen := 0
446+
for _, name := range e.ColumnName {
447+
if len(name) > nameMaxLen {
448+
nameMaxLen = len(name)
449+
}
450+
}
451+
nameFmt := " %s"
452+
if nameMaxLen > 0 {
453+
nameFmt = fmt.Sprintf(" %%-%ds", nameMaxLen)
454+
}
455+
456+
primaryKey := map[int]struct{}{}
457+
for _, pk := range e.PrimaryKey {
458+
primaryKey[int(pk)] = struct{}{}
459+
}
460+
461+
fmt.Fprintf(w, "Columns: \n")
462+
for i := 0; i < int(e.ColumnCount); i++ {
463+
if len(e.ColumnName) == 0 {
464+
fmt.Fprintf(w, nameFmt, "<n/a>")
465+
} else {
466+
fmt.Fprintf(w, nameFmt, e.ColumnName[i])
467+
}
468+
469+
fmt.Fprintf(w, " type=%-3d", e.realType(i))
470+
471+
if e.IsNumericColumn(i) {
472+
if len(unsignedMap) == 0 {
473+
fmt.Fprintf(w, " unsigned=<n/a>")
474+
} else if unsignedMap[i] {
475+
fmt.Fprintf(w, " unsigned=yes")
476+
} else {
477+
fmt.Fprintf(w, " unsigned=no ")
478+
}
479+
}
480+
if e.IsCharacterColumn(i) {
481+
if len(collationMap) == 0 {
482+
fmt.Fprintf(w, " collation=<n/a>")
483+
} else {
484+
fmt.Fprintf(w, " collation=%d ", collationMap[i])
485+
}
486+
}
487+
if e.IsEnumColumn(i) {
488+
if len(enumSetCollationMap) == 0 {
489+
fmt.Fprintf(w, " enum_collation=<n/a>")
490+
} else {
491+
fmt.Fprintf(w, " enum_collation=%d", enumSetCollationMap[i])
492+
}
493+
494+
if len(enumStrValueMap) == 0 {
495+
fmt.Fprintf(w, " enum=<n/a>")
496+
} else {
497+
fmt.Fprintf(w, " enum=%v", enumStrValueMap[i])
498+
}
499+
}
500+
if e.IsSetColumn(i) {
501+
if len(enumSetCollationMap) == 0 {
502+
fmt.Fprintf(w, " set_collation=<n/a>")
503+
} else {
504+
fmt.Fprintf(w, " set_collation=%d", enumSetCollationMap[i])
505+
}
506+
507+
if len(setStrValueMap) == 0 {
508+
fmt.Fprintf(w, " set=<n/a>")
509+
} else {
510+
fmt.Fprintf(w, " set=%v", setStrValueMap[i])
511+
}
512+
}
513+
if e.IsGeometryColumn(i) {
514+
if len(geometryTypeMap) == 0 {
515+
fmt.Fprintf(w, " geometry_type=<n/a>")
516+
} else {
517+
fmt.Fprintf(w, " geometry_type=%v", geometryTypeMap[i])
518+
}
519+
}
520+
521+
available, nullable := e.Nullable(i)
522+
if !available {
523+
fmt.Fprintf(w, " null=<n/a>")
524+
} else if nullable {
525+
fmt.Fprintf(w, " null=yes")
526+
} else {
527+
fmt.Fprintf(w, " null=no ")
528+
}
529+
530+
if _, ok := primaryKey[i]; ok {
531+
fmt.Fprintf(w, " pri")
532+
}
533+
534+
fmt.Fprintf(w, "\n")
535+
}
536+
426537
fmt.Fprintln(w)
427538
}
428539

@@ -492,6 +603,219 @@ func (e *TableMapEvent) bytesSlice2StrSlice(src [][]byte) []string {
492603
return ret
493604
}
494605

606+
// UnsignedMap returns a map: column index -> unsigned.
607+
// Note that only numeric columns will be returned.
608+
// nil is returned if not available or no numeric columns at all.
609+
func (e *TableMapEvent) UnsignedMap() map[int]bool {
610+
if len(e.SignednessBitmap) == 0 {
611+
return nil
612+
}
613+
p := 0
614+
ret := make(map[int]bool)
615+
for i := 0; i < int(e.ColumnCount); i++ {
616+
if !e.IsNumericColumn(i) {
617+
continue
618+
}
619+
ret[i] = e.SignednessBitmap[p/8]&(1<<uint(7-p%8)) != 0
620+
p++
621+
}
622+
return ret
623+
}
624+
625+
// CollationMap returns a map: column index -> collation id.
626+
// Note that only character columns will be returned.
627+
// nil is returned if not available or no character columns at all.
628+
func (e *TableMapEvent) CollationMap() map[int]uint64 {
629+
return e.collationMap(e.IsCharacterColumn, e.DefaultCharset, e.ColumnCharset)
630+
}
631+
632+
// EnumSetCollationMap returns a map: column index -> collation id.
633+
// Note that only enum or set columns will be returned.
634+
// nil is returned if not available or no enum/set columns at all.
635+
func (e *TableMapEvent) EnumSetCollationMap() map[int]uint64 {
636+
return e.collationMap(e.IsEnumOrSetColumn, e.EnumSetDefaultCharset, e.EnumSetColumnCharset)
637+
}
638+
639+
func (e *TableMapEvent) collationMap(includeType func(int) bool, defaultCharset, columnCharset []uint64) map[int]uint64 {
640+
641+
if len(defaultCharset) != 0 {
642+
defaultCollation := defaultCharset[0]
643+
644+
// character column index -> collation
645+
collations := make(map[int]uint64)
646+
for i := 1; i < len(defaultCharset); i += 2 {
647+
collations[int(defaultCharset[i])] = defaultCharset[i+1]
648+
}
649+
650+
p := 0
651+
ret := make(map[int]uint64)
652+
for i := 0; i < int(e.ColumnCount); i++ {
653+
if !includeType(i) {
654+
continue
655+
}
656+
657+
if collation, ok := collations[p]; ok {
658+
ret[i] = collation
659+
} else {
660+
ret[i] = defaultCollation
661+
}
662+
p++
663+
}
664+
665+
return ret
666+
}
667+
668+
if len(columnCharset) != 0 {
669+
670+
p := 0
671+
ret := make(map[int]uint64)
672+
for i := 0; i < int(e.ColumnCount); i++ {
673+
if !includeType(i) {
674+
continue
675+
}
676+
677+
ret[i] = columnCharset[p]
678+
p++
679+
}
680+
681+
return ret
682+
}
683+
684+
return nil
685+
}
686+
687+
// EnumStrValueMap returns a map: column index -> enum string value.
688+
// Note that only enum columns will be returned.
689+
// nil is returned if not available or no enum columns at all.
690+
func (e *TableMapEvent) EnumStrValueMap() map[int][]string {
691+
return e.strValueMap(e.IsEnumColumn, e.EnumStrValueString())
692+
}
693+
694+
// SetStrValueMap returns a map: column index -> set string value.
695+
// Note that only set columns will be returned.
696+
// nil is returned if not available or no set columns at all.
697+
func (e *TableMapEvent) SetStrValueMap() map[int][]string {
698+
return e.strValueMap(e.IsSetColumn, e.SetStrValueString())
699+
}
700+
701+
func (e *TableMapEvent) strValueMap(includeType func(int) bool, strValue [][]string) map[int][]string {
702+
if len(strValue) == 0 {
703+
return nil
704+
}
705+
p := 0
706+
ret := make(map[int][]string)
707+
for i := 0; i < int(e.ColumnCount); i++ {
708+
if !includeType(i) {
709+
continue
710+
}
711+
ret[i] = strValue[p]
712+
p++
713+
}
714+
return ret
715+
}
716+
717+
// GeometryTypeMap returns a map: column index -> geometry type.
718+
// Note that only geometry columns will be returned.
719+
// nil is returned if not available or no geometry columns at all.
720+
func (e *TableMapEvent) GeometryTypeMap() map[int]uint64 {
721+
if len(e.GeometryType) == 0 {
722+
return nil
723+
}
724+
p := 0
725+
ret := make(map[int]uint64)
726+
for i := 0; i < int(e.ColumnCount); i++ {
727+
if !e.IsGeometryColumn(i) {
728+
continue
729+
}
730+
731+
ret[i] = e.GeometryType[p]
732+
p++
733+
}
734+
return ret
735+
}
736+
737+
// Below realType and IsXXXColumn are base from:
738+
// table_def::type in sql/rpl_utility.h
739+
// Table_map_log_event::print_columns in mysql-8.0/sql/log_event.cc and mariadb-10.5/sql/log_event_client.cc
740+
741+
func (e *TableMapEvent) realType(i int) byte {
742+
743+
typ := e.ColumnType[i]
744+
745+
switch typ {
746+
case MYSQL_TYPE_STRING:
747+
rtyp := byte(e.ColumnMeta[i] >> 8)
748+
if rtyp == MYSQL_TYPE_ENUM || rtyp == MYSQL_TYPE_SET {
749+
return rtyp
750+
}
751+
752+
case MYSQL_TYPE_DATE:
753+
return MYSQL_TYPE_NEWDATE
754+
}
755+
756+
return typ
757+
}
758+
759+
func (e *TableMapEvent) IsNumericColumn(i int) bool {
760+
761+
switch e.realType(i) {
762+
case MYSQL_TYPE_TINY,
763+
MYSQL_TYPE_SHORT,
764+
MYSQL_TYPE_INT24,
765+
MYSQL_TYPE_LONG,
766+
MYSQL_TYPE_LONGLONG,
767+
MYSQL_TYPE_NEWDECIMAL,
768+
MYSQL_TYPE_FLOAT,
769+
MYSQL_TYPE_DOUBLE:
770+
return true
771+
772+
default:
773+
return false
774+
}
775+
776+
}
777+
778+
// IsCharacterColumn returns true if the column type is considered as character type.
779+
// Note that JSON/GEOMETRY types are treated as character type in mariadb.
780+
// (JSON is an alias for LONGTEXT in mariadb: https://mariadb.com/kb/en/json-data-type/)
781+
func (e *TableMapEvent) IsCharacterColumn(i int) bool {
782+
783+
switch e.realType(i) {
784+
case MYSQL_TYPE_STRING,
785+
MYSQL_TYPE_VAR_STRING,
786+
MYSQL_TYPE_VARCHAR,
787+
MYSQL_TYPE_BLOB:
788+
return true
789+
790+
case MYSQL_TYPE_GEOMETRY:
791+
if e.flavor == "mariadb" {
792+
return true
793+
}
794+
return false
795+
796+
default:
797+
return false
798+
}
799+
800+
}
801+
802+
func (e *TableMapEvent) IsEnumColumn(i int) bool {
803+
return e.realType(i) == MYSQL_TYPE_ENUM
804+
}
805+
806+
func (e *TableMapEvent) IsSetColumn(i int) bool {
807+
return e.realType(i) == MYSQL_TYPE_SET
808+
}
809+
810+
func (e *TableMapEvent) IsGeometryColumn(i int) bool {
811+
return e.realType(i) == MYSQL_TYPE_GEOMETRY
812+
}
813+
814+
func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool {
815+
rtyp := e.realType(i)
816+
return rtyp == MYSQL_TYPE_ENUM || rtyp == MYSQL_TYPE_SET
817+
}
818+
495819
// RowsEventStmtEndFlag is set in the end of the statement.
496820
const RowsEventStmtEndFlag = 0x01
497821

0 commit comments

Comments
 (0)