Skip to content

Commit 2dec1fe

Browse files
authored
Merge branch 'master' into default_connattrs
2 parents b401eea + df895b5 commit 2dec1fe

File tree

6 files changed

+80
-6
lines changed

6 files changed

+80
-6
lines changed

canal/sync.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -253,11 +253,11 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
253253
}
254254
var action string
255255
switch e.Header.EventType {
256-
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
256+
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
257257
action = InsertAction
258-
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
258+
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2, replication.MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
259259
action = DeleteAction
260-
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
260+
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2, replication.MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
261261
action = UpdateAction
262262
default:
263263
return errors.Errorf("%s not supported now", e.Header.EventType)

mysql/util.go

+20
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package mysql
22

33
import (
4+
"bytes"
5+
"compress/zlib"
46
"crypto/rand"
57
"crypto/rsa"
68
"crypto/sha1"
@@ -92,6 +94,24 @@ func EncryptPassword(password string, seed []byte, pub *rsa.PublicKey) ([]byte,
9294
return rsa.EncryptOAEP(sha1v, rand.Reader, pub, plain, nil)
9395
}
9496

97+
func DecompressMariadbData(data []byte) ([]byte, error) {
98+
// algorithm always 0=zlib
99+
// algorithm := (data[pos] & 0x07) >> 4
100+
headerSize := int(data[0] & 0x07)
101+
uncompressedDataSize := BFixedLengthInt(data[1 : 1+headerSize])
102+
uncompressedData := make([]byte, uncompressedDataSize)
103+
r, err := zlib.NewReader(bytes.NewReader(data[1+headerSize:]))
104+
if err != nil {
105+
return nil, err
106+
}
107+
defer r.Close()
108+
_, err = io.ReadFull(r, uncompressedData)
109+
if err != nil {
110+
return nil, err
111+
}
112+
return uncompressedData, nil
113+
}
114+
95115
// AppendLengthEncodedInteger: encodes a uint64 value and appends it to the given bytes slice
96116
func AppendLengthEncodedInteger(b []byte, n uint64) []byte {
97117
switch {

replication/const.go

+15
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ const (
101101
MARIADB_BINLOG_CHECKPOINT_EVENT
102102
MARIADB_GTID_EVENT
103103
MARIADB_GTID_LIST_EVENT
104+
MARIADB_START_ENCRYPTION_EVENT
105+
MARIADB_QUERY_COMPRESSED_EVENT
106+
MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1
107+
MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1
108+
MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1
104109
)
105110

106111
func (e EventType) String() string {
@@ -197,6 +202,16 @@ func (e EventType) String() string {
197202
return "TransactionPayloadEvent"
198203
case HEARTBEAT_LOG_EVENT_V2:
199204
return "HeartbeatLogEventV2"
205+
case MARIADB_START_ENCRYPTION_EVENT:
206+
return "MariadbStartEncryptionEvent"
207+
case MARIADB_QUERY_COMPRESSED_EVENT:
208+
return "MariadbQueryCompressedEvent"
209+
case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
210+
return "MariadbWriteRowsCompressedEventV1"
211+
case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
212+
return "MariadbUpdateRowsCompressedEventV1"
213+
case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
214+
return "MariadbDeleteRowsCompressedEventV1"
200215

201216
default:
202217
return "UnknownEvent"

replication/event.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,9 @@ type QueryEvent struct {
297297
Schema []byte
298298
Query []byte
299299

300+
// for mariadb QUERY_COMPRESSED_EVENT
301+
compressed bool
302+
300303
// in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
301304
GSet GTIDSet
302305
}
@@ -328,7 +331,15 @@ func (e *QueryEvent) Decode(data []byte) error {
328331
//skip 0x00
329332
pos++
330333

331-
e.Query = data[pos:]
334+
if e.compressed {
335+
decompressedQuery, err := DecompressMariadbData(data[pos:])
336+
if err != nil {
337+
return err
338+
}
339+
e.Query = decompressedQuery
340+
} else {
341+
e.Query = data[pos:]
342+
}
332343
return nil
333344
}
334345

replication/parser.go

+18
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
249249
switch h.EventType {
250250
case QUERY_EVENT:
251251
e = &QueryEvent{}
252+
case MARIADB_QUERY_COMPRESSED_EVENT:
253+
e = &QueryEvent{
254+
compressed: true,
255+
}
252256
case XID_EVENT:
253257
e = &XIDEvent{}
254258
case TABLE_MAP_EVENT:
@@ -270,7 +274,11 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
270274
WRITE_ROWS_EVENTv2,
271275
UPDATE_ROWS_EVENTv2,
272276
DELETE_ROWS_EVENTv2,
277+
MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1,
278+
MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1,
279+
MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1,
273280
PARTIAL_UPDATE_ROWS_EVENT: // Extension of UPDATE_ROWS_EVENT, allowing partial values according to binlog_row_value_options
281+
274282
e = p.newRowsEvent(h)
275283
case ROWS_QUERY_EVENT:
276284
e = &RowsQueryEvent{}
@@ -412,6 +420,16 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
412420
case UPDATE_ROWS_EVENTv1:
413421
e.Version = 1
414422
e.needBitmap2 = true
423+
case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
424+
e.Version = 1
425+
e.compressed = true
426+
case MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
427+
e.Version = 1
428+
e.compressed = true
429+
case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:
430+
e.Version = 1
431+
e.compressed = true
432+
e.needBitmap2 = true
415433
case WRITE_ROWS_EVENTv2:
416434
e.Version = 2
417435
case UPDATE_ROWS_EVENTv2:

replication/row_event.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,9 @@ type RowsEvent struct {
830830
tables map[uint64]*TableMapEvent
831831
needBitmap2 bool
832832

833+
// for mariadb *_COMPRESSED_EVENT_V1
834+
compressed bool
835+
833836
eventType EventType
834837

835838
Table *TableMapEvent
@@ -970,9 +973,9 @@ func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) {
970973

971974
var rowImageType EnumRowImageType
972975
switch e.eventType {
973-
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2:
976+
case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
974977
rowImageType = EnumRowImageTypeWriteAI
975-
case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2:
978+
case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1:
976979
rowImageType = EnumRowImageTypeDeleteBI
977980
default:
978981
rowImageType = EnumRowImageTypeUpdateBI
@@ -1002,6 +1005,13 @@ func (e *RowsEvent) Decode(data []byte) error {
10021005
if err != nil {
10031006
return err
10041007
}
1008+
if e.compressed {
1009+
uncompressedData, err := DecompressMariadbData(data[pos:])
1010+
if err != nil {
1011+
return err
1012+
}
1013+
return e.DecodeData(0, uncompressedData)
1014+
}
10051015
return e.DecodeData(pos, data)
10061016
}
10071017

0 commit comments

Comments
 (0)