From 6b02acee1019bb54d19dceee9cea7d0207515f2d Mon Sep 17 00:00:00 2001 From: Xinze Guo <101622833+azexcy@users.noreply.github.com> Date: Fri, 17 Nov 2023 14:00:51 +0800 Subject: [PATCH] Improve MySQL binlog query event decode --- .../mysql/ingest/MySQLIncrementalDumper.java | 7 ++----- .../netty/MySQLBinlogEventPacketDecoder.java | 15 +++++++++++---- .../MySQLBinlogEventPacketDecoderTest.java | 17 +++++++++++++++++ 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index fd6a290997ca4..e3e093d777cbd 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -116,11 +116,8 @@ protected void runBlocking() { private void handleEvents(final List events) { List dataRecords = new LinkedList<>(); for (AbstractBinlogEvent each : events) { - if (!(each instanceof AbstractRowsEvent)) { - dataRecords.add(createPlaceholderRecord(each)); - continue; - } - dataRecords.addAll(handleEvent(each)); + List records = handleEvent(each); + dataRecords.addAll(records); } if (dataRecords.isEmpty()) { return; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java index 0bcaabba7fec1..4b3fff759b323 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoder.java @@ -125,6 +125,8 @@ private void processEventWithTX(final AbstractBinlogEvent binlogEvent, final Lis QueryEvent queryEvent = (QueryEvent) binlogEvent; if (TX_BEGIN_SQL.equals(queryEvent.getSql())) { records = new LinkedList<>(); + } else { + out.add(binlogEvent); } } else if (binlogEvent instanceof XidEvent) { records.add(binlogEvent); @@ -165,7 +167,7 @@ private Optional decodeEvent(final MySQLBinlogEventHeader b case DELETE_ROWS_EVENT_V2: return Optional.of(decodeDeleteRowsEventV2(binlogEventHeader, payload)); case QUERY_EVENT: - return Optional.of(decodeQueryEvent(binlogEventHeader.getChecksumLength(), payload)); + return Optional.of(decodeQueryEvent(binlogEventHeader, payload)); case XID_EVENT: return Optional.of(decodeXidEvent(binlogEventHeader, payload)); default: @@ -241,15 +243,20 @@ private PlaceholderEvent decodePlaceholderEvent(final MySQLBinlogEventHeader bin return result; } - private QueryEvent decodeQueryEvent(final int checksumLength, final MySQLPacketPayload payload) { + private QueryEvent decodeQueryEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { int threadId = payload.readInt4(); int executionTime = payload.readInt4(); payload.skipReserved(1); int errorCode = payload.readInt2(); payload.skipReserved(payload.readInt2()); String databaseName = payload.readStringNul(); - String sql = payload.readStringFix(payload.getByteBuf().readableBytes() - checksumLength); - return new QueryEvent(threadId, executionTime, errorCode, databaseName, sql); + String sql = payload.readStringFix(payload.getByteBuf().readableBytes() - binlogEventHeader.getChecksumLength()); + QueryEvent result = new QueryEvent(threadId, executionTime, errorCode, databaseName, sql); + result.setFileName(binlogContext.getFileName()); + result.setPosition(binlogEventHeader.getLogPos()); + result.setTimestamp(binlogEventHeader.getTimestamp()); + result.setServerId(binlogEventHeader.getServerId()); + return result; } private XidEvent decodeXidEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java index 651188775ffbc..3aa102d563334 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/netty/MySQLBinlogEventPacketDecoderTest.java @@ -25,6 +25,7 @@ import io.netty.util.internal.StringUtil; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogContext; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.QueryEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.UpdateRowsEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.WriteRowsEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.XidEvent; @@ -50,6 +51,8 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.when; @@ -108,6 +111,20 @@ void assertDecodeFormatDescriptionEvent() { assertThat(binlogContext.getChecksumLength(), is(4)); } + @Test + void assertDecodeQueryEvent() { + ByteBuf byteBuf = Unpooled.buffer(); + byteBuf.writeBytes(StringUtil.decodeHexDump("00f3e25665020100000087000000c2740f0a0400c9150000000000000400002d000000000000012000a045000000000603737464042d002d00e0000c0164735f3000116df40b00000" + + "0000012ff0064735f300044524f50205441424c452060745f70726f76696e636560202f2a2067656e65726174656420627920736572766572202a2fcefe4ec6")); + List decodedEvents = new LinkedList<>(); + binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents); + assertFalse(decodedEvents.isEmpty()); + Object actual = decodedEvents.get(0); + assertInstanceOf(QueryEvent.class, actual); + assertThat(((QueryEvent) actual).getTimestamp(), is(1700193011L)); + assertThat(((QueryEvent) actual).getPosition(), is(168785090L)); + } + @Test void assertDecodeTableMapEvent() { ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();