Skip to content

Commit

Permalink
Improve MySQL binlog query event decode
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Nov 17, 2023
1 parent 4983e94 commit 6b02ace
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,8 @@ protected void runBlocking() {
private void handleEvents(final List<AbstractBinlogEvent> events) {
List<Record> dataRecords = new LinkedList<>();
for (AbstractBinlogEvent each : events) {
if (!(each instanceof AbstractRowsEvent)) {
dataRecords.add(createPlaceholderRecord(each));
continue;
}
dataRecords.addAll(handleEvent(each));
List<? extends Record> records = handleEvent(each);
dataRecords.addAll(records);
}
if (dataRecords.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ private void processEventWithTX(final AbstractBinlogEvent binlogEvent, final Lis
QueryEvent queryEvent = (QueryEvent) binlogEvent;
if (TX_BEGIN_SQL.equals(queryEvent.getSql())) {
records = new LinkedList<>();
} else {
out.add(binlogEvent);
}
} else if (binlogEvent instanceof XidEvent) {
records.add(binlogEvent);
Expand Down Expand Up @@ -165,7 +167,7 @@ private Optional<AbstractBinlogEvent> decodeEvent(final MySQLBinlogEventHeader b
case DELETE_ROWS_EVENT_V2:
return Optional.of(decodeDeleteRowsEventV2(binlogEventHeader, payload));
case QUERY_EVENT:
return Optional.of(decodeQueryEvent(binlogEventHeader.getChecksumLength(), payload));
return Optional.of(decodeQueryEvent(binlogEventHeader, payload));
case XID_EVENT:
return Optional.of(decodeXidEvent(binlogEventHeader, payload));
default:
Expand Down Expand Up @@ -241,15 +243,20 @@ private PlaceholderEvent decodePlaceholderEvent(final MySQLBinlogEventHeader bin
return result;
}

private QueryEvent decodeQueryEvent(final int checksumLength, final MySQLPacketPayload payload) {
private QueryEvent decodeQueryEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
int threadId = payload.readInt4();
int executionTime = payload.readInt4();
payload.skipReserved(1);
int errorCode = payload.readInt2();
payload.skipReserved(payload.readInt2());
String databaseName = payload.readStringNul();
String sql = payload.readStringFix(payload.getByteBuf().readableBytes() - checksumLength);
return new QueryEvent(threadId, executionTime, errorCode, databaseName, sql);
String sql = payload.readStringFix(payload.getByteBuf().readableBytes() - binlogEventHeader.getChecksumLength());
QueryEvent result = new QueryEvent(threadId, executionTime, errorCode, databaseName, sql);
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getLogPos());
result.setTimestamp(binlogEventHeader.getTimestamp());
result.setServerId(binlogEventHeader.getServerId());
return result;
}

private XidEvent decodeXidEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.util.internal.StringUtil;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.QueryEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.XidEvent;
Expand All @@ -50,6 +51,8 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -108,6 +111,20 @@ void assertDecodeFormatDescriptionEvent() {
assertThat(binlogContext.getChecksumLength(), is(4));
}

@Test
void assertDecodeQueryEvent() {
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeBytes(StringUtil.decodeHexDump("00f3e25665020100000087000000c2740f0a0400c9150000000000000400002d000000000000012000a045000000000603737464042d002d00e0000c0164735f3000116df40b00000"
+ "0000012ff0064735f300044524f50205441424c452060745f70726f76696e636560202f2a2067656e65726174656420627920736572766572202a2fcefe4ec6"));
List<Object> decodedEvents = new LinkedList<>();
binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents);
assertFalse(decodedEvents.isEmpty());
Object actual = decodedEvents.get(0);
assertInstanceOf(QueryEvent.class, actual);
assertThat(((QueryEvent) actual).getTimestamp(), is(1700193011L));
assertThat(((QueryEvent) actual).getPosition(), is(168785090L));
}

@Test
void assertDecodeTableMapEvent() {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
Expand Down

0 comments on commit 6b02ace

Please sign in to comment.