Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;

Expand Down Expand Up @@ -218,6 +217,11 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) {
compatibilitySet.contains(CompatibilityMode.INTEGER_AS_BYTE_ARRAY)
);
}
if (eventDataDeserializer instanceof TransactionPayloadEventDataDeserializer) {
TransactionPayloadEventDataDeserializer deserializer =
(TransactionPayloadEventDataDeserializer) eventDataDeserializer;
deserializer.setCompatibilityModes(compatibilitySet);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;

/**
* @author <a href="mailto:somesh.malviya@booking.com">Somesh Malviya</a>
Expand All @@ -34,6 +36,12 @@ public class TransactionPayloadEventDataDeserializer implements EventDataDeseria
public static final int OTW_PAYLOAD_COMPRESSION_TYPE_FIELD = 2;
public static final int OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD = 3;

private EnumSet<EventDeserializer.CompatibilityMode> compatibilitySet = EnumSet.noneOf(EventDeserializer.CompatibilityMode.class);

public void setCompatibilityModes(EnumSet<EventDeserializer.CompatibilityMode> compatibilityModes) {
this.compatibilitySet = compatibilityModes;
}

@Override
public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
TransactionPayloadEventData eventData = new TransactionPayloadEventData();
Expand Down Expand Up @@ -88,9 +96,15 @@ public TransactionPayloadEventData deserialize(ByteArrayInputStream inputStream)
return eventData;
}

private static ArrayList<Event> getDecompressedEvents(TransactionPayloadEventData eventData) throws IOException {
private ArrayList<Event> getDecompressedEvents(TransactionPayloadEventData eventData) throws IOException {
ArrayList<Event> decompressedEvents = new ArrayList<>();
EventDeserializer transactionPayloadEventDeserializer = new EventDeserializer();
if (!compatibilitySet.isEmpty()) {
EventDeserializer.CompatibilityMode[] compatibilityModes = compatibilitySet.toArray(new EventDeserializer.CompatibilityMode[0]);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would create a similar function, like the one in this file: public void setCompatibilityModes(EnumSet<EventDeserializer.CompatibilityMode> compatibilityModes) in EventDeserializer, to pass the compatibility mode without unpacking it into an array for every transaction payload event.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

EventDeserializer.CompatibilityMode first = compatibilityModes[0];
EventDeserializer.CompatibilityMode[] rest = Arrays.copyOfRange(compatibilityModes, 1, compatibilityModes.length);
transactionPayloadEventDeserializer.setCompatibilityMode(first, rest);
}

try (ZstdInputStream zstdInputStream = new ZstdInputStream(new java.io.ByteArrayInputStream(eventData.getPayload()))) {
ByteArrayInputStream destinationInputStream = new ByteArrayInputStream(zstdInputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
*/
package com.github.shyiko.mysql.binlog.event.deserialization;

import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TransactionPayloadEventData;
import com.github.shyiko.mysql.binlog.event.XAPrepareEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import org.testng.annotations.Test;

import java.io.IOException;
import java.io.Serializable;

import static org.testng.Assert.assertEquals;
import static org.testng.AssertJUnit.assertTrue;

/**
* @author <a href="mailto:somesh.malviya@booking.com">Somesh Malviya</a>
Expand Down Expand Up @@ -81,6 +86,7 @@ public class TransactionPayloadEventDataDeserializerTest {
" after=[1, Once Upon a Time in the West, 1968, Italy, Western|Action, Claudia Cardinale|Charles Bronson|Henry Fonda|Gabriele Ferzetti|Frank Wolff|Al Mulock|Jason Robards|Woody Strode|Jack Elam|Lionel Stander|Paolo Stoppa|Keenan Wynn|Aldo Sambrell, Sergio Leone, Ennio Morricone, Sergio Leone|Sergio Donati|Dario Argento|Bernardo Bertolucci, Tonino Delli Colli, Paramount Pictures]}\n")
.append("]}")
.toString();
private static final byte[] UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY = new byte[] {1, 0, 0, 0};

@Test
public void deserialize() throws IOException {
Expand All @@ -97,4 +103,60 @@ public void deserialize() throws IOException {
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
assertEquals(UNCOMPRESSED_UPDATE_EVENT, transactionPayloadEventData.getUncompressedEvents().get(2).getData().toString());
}

@Test
public void deserializePropagatingCompatibilityModeToTransactionPayloadEventDataDeserializer() throws IOException {

ByteArrayInputStream dataStream = new ByteArrayInputStream(DATA);

// Mock create target TransactionPayloadEventData DATA event header
final EventHeaderV4 eventHeader = new EventHeaderV4();
eventHeader.setEventType(EventType.TRANSACTION_PAYLOAD);
eventHeader.setEventLength(DATA.length + 19L);
eventHeader.setTimestamp(1646406641000L);
eventHeader.setServerId(223344);


EventHeaderDeserializer eventHeaderDeserializer = new EventHeaderDeserializer() {

private long count = 0L;

private EventHeaderDeserializer defaultEventHeaderDeserializer = new EventHeaderV4Deserializer();

@Override
public EventHeader deserialize(ByteArrayInputStream inputStream) throws IOException {
if (count > 0) {
// uncompressed event header deserialize
return defaultEventHeaderDeserializer.deserialize(inputStream);
}
count++;
// we need to return target TransactionPayloadEventData DATA event header we had mocked
return eventHeader;
}
};

EventDeserializer eventDeserializer = new EventDeserializer(eventHeaderDeserializer, new NullEventDataDeserializer());
eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.INTEGER_AS_BYTE_ARRAY);

Event event = eventDeserializer.nextEvent(dataStream);

assertTrue(event.getHeader().getEventType() == EventType.TRANSACTION_PAYLOAD);
assertTrue(event.getData() instanceof TransactionPayloadEventData);

TransactionPayloadEventData transactionPayloadEventData = event.getData();
assertEquals(COMPRESSION_TYPE, transactionPayloadEventData.getCompressionType());
assertEquals(PAYLOAD_SIZE, transactionPayloadEventData.getPayloadSize());
assertEquals(UNCOMPRESSED_SIZE, transactionPayloadEventData.getUncompressedSize());
assertEquals(NUMBER_OF_UNCOMPRESSED_EVENTS, transactionPayloadEventData.getUncompressedEvents().size());
assertEquals(EventType.QUERY, transactionPayloadEventData.getUncompressedEvents().get(0).getHeader().getEventType());
assertEquals(EventType.TABLE_MAP, transactionPayloadEventData.getUncompressedEvents().get(1).getHeader().getEventType());
assertEquals(EventType.EXT_UPDATE_ROWS, transactionPayloadEventData.getUncompressedEvents().get(2).getHeader().getEventType());
assertEquals(EventType.XID, transactionPayloadEventData.getUncompressedEvents().get(3).getHeader().getEventType());
assertTrue(transactionPayloadEventData.getUncompressedEvents().get(2).getData() instanceof UpdateRowsEventData);

UpdateRowsEventData updateRowsEventData = transactionPayloadEventData.getUncompressedEvents().get(2).getData();
assertEquals(1, updateRowsEventData.getRows().size());
Serializable[] updateBefore = updateRowsEventData.getRows().get(0).getKey();
assertEquals(UNCOMPRESSED_UPDATE_EVENT_BEFORE_ROW_0_BYTE_ARRAY, updateBefore[0]);
}
}
Loading