diff --git a/.github/workflows/e2e-operation.yml b/.github/workflows/e2e-operation.yml index 8b32ceb6b1808..372ed5d1d74e3 100644 --- a/.github/workflows/e2e-operation.yml +++ b/.github/workflows/e2e-operation.yml @@ -60,12 +60,12 @@ jobs: fail-fast: false matrix: operation: [ transaction, pipeline, showprocesslist ] - image: [ { type: "it.docker.mysql.version", version: "mysql:5.7" }, { type: "it.docker.postgresql.version", version: "postgres:12-alpine" }, { type: "it.docker.opengauss.version", version: "enmotech/opengauss:2.1.0" } ] + image: [ { type: "it.docker.mysql.version", version: "mysql:5.7" }, { type: "it.docker.postgresql.version", version: "postgres:12-alpine" }, { type: "it.docker.opengauss.version", version: "enmotech/opengauss:2.1.0,enmotech/opengauss:3.1.0" } ] exclude: - operation: showprocesslist image: { type: "it.docker.postgresql.version", version: "postgres:12-alpine" } - operation: showprocesslist - image: { type: "it.docker.opengauss.version", version: "enmotech/opengauss:2.1.0" } + image: { type: "it.docker.opengauss.version", version: "enmotech/opengauss:2.1.0,enmotech/opengauss:3.1.0" } steps: - env: changed_operations: ${{ needs.detect-changed-files.outputs.changed_operations }} diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java index e8437ad797007..3aeac60402ca1 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java @@ -19,15 +19,15 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel; +import org.apache.shardingsphere.data.pipeline.core.exception.IngestException; +import org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumper; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition; import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; -import org.apache.shardingsphere.data.pipeline.core.exception.IngestException; import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication; import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin; import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber; @@ -46,12 +46,18 @@ import org.opengauss.replication.PGReplicationStream; import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * WAL dumper of openGauss. @@ -60,6 +66,10 @@ @Slf4j public final class OpenGaussWALDumper extends AbstractPipelineLifecycleRunnable implements IncrementalDumper { + private static final Pattern VERSION_PATTERN = Pattern.compile("^\\(openGauss (\\d)"); + + private static final int DEFAULT_VERSION = 2; + private final IncrementalDumperContext dumperContext; private final AtomicReference walPosition; @@ -74,6 +84,8 @@ public final class OpenGaussWALDumper extends AbstractPipelineLifecycleRunnable private List rowEvents = new LinkedList<>(); + private final AtomicReference currentCsn = new AtomicReference<>(); + public OpenGaussWALDumper(final IncrementalDumperContext dumperContext, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()), @@ -110,10 +122,11 @@ protected void runBlocking() { @SneakyThrows(InterruptedException.class) private void dump() throws SQLException { PGReplicationStream stream = null; + int majorVersion = getMajorVersion(); try (PgConnection connection = getReplicationConnectionUnwrap()) { stream = logicalReplication.createReplicationStream(connection, walPosition.get().getLogSequenceNumber(), - OpenGaussIngestPositionManager.getUniqueSlotName(connection, dumperContext.getJobId())); - DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX); + OpenGaussIngestPositionManager.getUniqueSlotName(connection, dumperContext.getJobId()), majorVersion); + DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX, majorVersion >= 3); while (isRunning()) { ByteBuffer message = stream.readPending(); if (null == message) { @@ -122,7 +135,7 @@ private void dump() throws SQLException { } AbstractWALEvent event = decodingPlugin.decode(message, new OpenGaussLogSequenceNumber(stream.getLastReceiveLSN())); if (decodeWithTX) { - processEventWithTX(event); + processEventWithTX(event, majorVersion); } else { processEventIgnoreTX(event); } @@ -138,28 +151,61 @@ private void dump() throws SQLException { } } + private int getMajorVersion() throws SQLException { + StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig(); + try ( + Connection connection = DriverManager.getConnection(dataSourceConfig.getUrl(), dataSourceConfig.getUsername(), dataSourceConfig.getPassword()); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT version()")) { + resultSet.next(); + String versionText = resultSet.getString(1); + return parseMajorVersion(versionText); + } + } + + private int parseMajorVersion(final String versionText) { + Matcher matcher = VERSION_PATTERN.matcher(versionText); + boolean isFind = matcher.find(); + log.info("openGauss major version={}, `select version()`={}", isFind ? matcher.group(1) : DEFAULT_VERSION, versionText); + if (isFind) { + return Integer.parseInt(matcher.group(1)); + } + return DEFAULT_VERSION; + } + private PgConnection getReplicationConnectionUnwrap() throws SQLException { return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).unwrap(PgConnection.class); } - private void processEventWithTX(final AbstractWALEvent event) { + private void processEventWithTX(final AbstractWALEvent event, final int majorVersion) { if (event instanceof BeginTXEvent) { + if (majorVersion < 3) { + return; + } + if (!rowEvents.isEmpty()) { + log.warn("Commit event parse have problem, there still has uncommitted row events size={}, ", rowEvents.size()); + } + currentCsn.set(((BeginTXEvent) event).getCsn()); return; } if (event instanceof AbstractRowEvent) { - rowEvents.add((AbstractRowEvent) event); + AbstractRowEvent rowEvent = (AbstractRowEvent) event; + rowEvent.setCsn(currentCsn.get()); + rowEvents.add(rowEvent); return; } if (event instanceof CommitTXEvent) { - Long csn = ((CommitTXEvent) event).getCsn(); List records = new LinkedList<>(); for (AbstractRowEvent each : rowEvents) { - each.setCsn(csn); + if (majorVersion < 3) { + each.setCsn(((CommitTXEvent) event).getCsn()); + } records.add(walEventConverter.convert(each)); } records.add(walEventConverter.convert(event)); channel.push(records); rowEvents = new LinkedList<>(); + currentCsn.set(null); } } diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java index 082134dc0f29a..cc8a37d938968 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussLogicalReplication.java @@ -28,6 +28,7 @@ import org.opengauss.jdbc.PgConnection; import org.opengauss.replication.LogSequenceNumber; import org.opengauss.replication.PGReplicationStream; +import org.opengauss.replication.fluent.logical.ChainedLogicalStreamBuilder; import java.sql.Connection; import java.sql.DriverManager; @@ -86,17 +87,26 @@ private Connection tryConnectingToHAPort(final String jdbcUrl, final Properties * @param connection connection * @param startPosition start position * @param slotName slot name + * @param majorVersion version * @return replication stream * @throws SQLException SQL exception */ - public PGReplicationStream createReplicationStream(final PgConnection connection, final BaseLogSequenceNumber startPosition, final String slotName) throws SQLException { - return connection.getReplicationAPI() + public PGReplicationStream createReplicationStream(final PgConnection connection, final BaseLogSequenceNumber startPosition, final String slotName, + final int majorVersion) throws SQLException { + ChainedLogicalStreamBuilder logicalStreamBuilder = connection.getReplicationAPI() .replicationStream() .logical() .withSlotName(slotName) .withSlotOption("include-xids", true) .withSlotOption("skip-empty-xacts", true) - .withStartPosition((LogSequenceNumber) startPosition.get()) + .withStartPosition((LogSequenceNumber) startPosition.get()); + if (majorVersion < 3) { + return logicalStreamBuilder.start(); + } + return logicalStreamBuilder + .withSlotOption("parallel-decode-num", 10) + .withSlotOption("decode-style", "j") + .withSlotOption("sending-batch", 0) .start(); } } diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java index 8f7396a518291..f15ce626cadb6 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java @@ -57,9 +57,7 @@ public final class MppdbDecodingPlugin implements DecodingPlugin { private final boolean decodeWithTX; - public MppdbDecodingPlugin(final BaseTimestampUtils timestampUtils) { - this(timestampUtils, false); - } + private final boolean decodeParallelly; @Override public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumber logSequenceNumber) { @@ -77,10 +75,18 @@ public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumbe } private AbstractWALEvent decodeDataWithTX(final String dataText) { + if (decodeParallelly) { + return decodeParallelly(dataText); + } else { + return decodeSerially(dataText); + } + } + + private AbstractWALEvent decodeSerially(final String dataText) { AbstractWALEvent result = new PlaceholderEvent(); if (dataText.startsWith("BEGIN")) { int beginIndex = dataText.indexOf("BEGIN") + "BEGIN".length() + 1; - result = new BeginTXEvent(Long.parseLong(dataText.substring(beginIndex))); + result = new BeginTXEvent(Long.parseLong(dataText.substring(beginIndex)), null); } else if (dataText.startsWith("COMMIT")) { int commitBeginIndex = dataText.indexOf("COMMIT") + "COMMIT".length() + 1; int csnBeginIndex = dataText.indexOf("CSN") + "CSN".length() + 1; @@ -91,6 +97,22 @@ private AbstractWALEvent decodeDataWithTX(final String dataText) { return result; } + private AbstractWALEvent decodeParallelly(final String dataText) { + AbstractWALEvent result = new PlaceholderEvent(); + if (dataText.startsWith("BEGIN")) { + int beginIndex = dataText.indexOf("CSN:") + "CSN:".length() + 1; + int firstLsnIndex = dataText.indexOf("first_lsn"); + long csn = firstLsnIndex > 0 ? Long.parseLong(dataText.substring(beginIndex, firstLsnIndex - 1)) : 0L; + result = new BeginTXEvent(null, csn); + } else if (dataText.startsWith("commit") || dataText.startsWith("COMMIT")) { + int beginIndex = dataText.indexOf("xid:") + "xid:".length() + 1; + result = new CommitTXEvent(Long.parseLong(dataText.substring(beginIndex)), null); + } else if (dataText.startsWith("{")) { + result = readTableEvent(dataText); + } + return result; + } + private AbstractWALEvent decodeDataIgnoreTX(final String dataText) { return dataText.startsWith("{") ? readTableEvent(dataText) : new PlaceholderEvent(); } diff --git a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumperTest.java b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumperTest.java new file mode 100644 index 0000000000000..21abeb075b424 --- /dev/null +++ b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumperTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.opengauss.ingest; + +import org.apache.shardingsphere.infra.util.reflection.ReflectionUtils; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; + +class OpenGaussWALDumperTest { + + @Test + void assertGetVersion() throws NoSuchMethodException { + OpenGaussWALDumper dumper = mock(OpenGaussWALDumper.class); + int version = ReflectionUtils.invokeMethod(OpenGaussWALDumper.class.getDeclaredMethod("parseMajorVersion", String.class), dumper, + "(openGauss 3.1.0 build ) compiled at 2023-02-17 16:13:51 commit 0 last mr on x86_64-unknown-linux-gnu, compiled by g++ (GCC) 7.3.0, 64-bit"); + assertThat(version, is(3)); + OpenGaussWALDumper mock = mock(OpenGaussWALDumper.class); + version = ReflectionUtils.invokeMethod(OpenGaussWALDumper.class.getDeclaredMethod("parseMajorVersion", String.class), mock, "(openGauss 5.0.1 build )"); + assertThat(version, is(5)); + version = ReflectionUtils.invokeMethod(OpenGaussWALDumper.class.getDeclaredMethod("parseMajorVersion", String.class), mock, "not match"); + assertThat(version, is(2)); + } +} diff --git a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java index 221add916f744..3e7726d598aec 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java +++ b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPluginTest.java @@ -45,6 +45,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -65,7 +66,7 @@ void assertDecodeWriteRowEvent() { tableData.setColumnsName(IntStream.range(0, insertTypes.length).mapToObj(idx -> "data" + idx).toArray(String[]::new)); tableData.setColumnsVal(IntStream.range(0, insertTypes.length).mapToObj(idx -> "'1 2 3'").toArray(String[]::new)); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); IntStream.range(0, insertTypes.length).forEach(each -> assertThat(actual.getAfterRow().get(each), is("1 2 3"))); @@ -80,7 +81,7 @@ void assertDecodeUpdateRowEvent() { tableData.setColumnsType(new String[]{"character varying"}); tableData.setColumnsVal(new String[]{"'1 2 3'"}); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - UpdateRowEvent actual = (UpdateRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + UpdateRowEvent actual = (UpdateRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); assertThat(actual.getAfterRow().get(0), is("1 2 3")); @@ -97,7 +98,7 @@ void assertDecodeDeleteRowEvent() { tableData.setOldKeysName(IntStream.range(0, deleteTypes.length).mapToObj(idx -> "data" + idx).toArray(String[]::new)); tableData.setOldKeysVal(deleteValues); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - DeleteRowEvent actual = (DeleteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + DeleteRowEvent actual = (DeleteRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); IntStream.range(0, deleteTypes.length).forEach(each -> assertThat(actual.getPrimaryKeys().get(each).toString(), is(deleteValues[each]))); @@ -112,7 +113,7 @@ void assertDecodeWriteRowEventWithMoney() { tableData.setColumnsType(new String[]{"money"}); tableData.setColumnsVal(new String[]{"'$1.08'"}); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); Object byteaObj = actual.getAfterRow().get(0); @@ -128,7 +129,7 @@ void assertDecodeWriteRowEventWithBoolean() { tableData.setColumnsType(new String[]{"boolean"}); tableData.setColumnsVal(new String[]{Boolean.TRUE.toString()}); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); Object byteaObj = actual.getAfterRow().get(0); @@ -155,7 +156,7 @@ void assertDecodeWriteRowEventWithDateAndTime() throws SQLException { when(timestampUtils.toTimestamp(null, "2010-12-12")).thenReturn(Timestamp.valueOf("2010-12-12 00:00:00.0")); when(timestampUtils.toTimestamp(null, "2013-12-11 pst")).thenReturn(Timestamp.valueOf("2013-12-11 16:00:00.0")); when(timestampUtils.toTimestamp(null, "2003-04-12 04:05:06")).thenReturn(Timestamp.valueOf("2003-04-12 04:05:00.0")); - WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils)).decode(data, logSequenceNumber); + WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils), false, false).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); IntStream.range(0, insertTypes.length).forEach(each -> assertThat(actual.getAfterRow().get(each).toString(), is(compareValues[each]))); @@ -170,7 +171,7 @@ void assertDecodeWriteRowEventWithByteA() { tableData.setColumnsType(new String[]{"bytea"}); tableData.setColumnsVal(new String[]{"'\\xff00ab'"}); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); Object byteaObj = actual.getAfterRow().get(0); @@ -187,7 +188,7 @@ void assertDecodeWriteRowEventWithRaw() { tableData.setColumnsType(new String[]{"raw"}); tableData.setColumnsVal(new String[]{"'7D'"}); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber)); assertThat(actual.getTableName(), is("test")); Object byteaObj = actual.getAfterRow().get(0); @@ -198,7 +199,7 @@ void assertDecodeWriteRowEventWithRaw() { @Test void assertDecodeUnknownTableType() { ByteBuffer data = ByteBuffer.wrap("unknown".getBytes()); - assertThat(new MppdbDecodingPlugin(null).decode(data, logSequenceNumber), instanceOf(PlaceholderEvent.class)); + assertThat(new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber), instanceOf(PlaceholderEvent.class)); } @Test @@ -210,7 +211,7 @@ void assertDecodeUnknownRowEventType() { tableData.setColumnsType(new String[]{"character varying"}); tableData.setColumnsVal(new String[]{"1 2 3"}); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - assertThrows(IngestException.class, () -> new MppdbDecodingPlugin(null).decode(data, logSequenceNumber)); + assertThrows(IngestException.class, () -> new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber)); } @Test @@ -224,11 +225,11 @@ void assertDecodeTime() throws SQLException { TimestampUtils timestampUtils = mock(TimestampUtils.class); when(timestampUtils.toTime(null, "1 2 3")).thenThrow(new SQLException("")); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - assertThrows(DecodingException.class, () -> new MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils), true).decode(data, logSequenceNumber)); + assertThrows(DecodingException.class, () -> new MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils), true, false).decode(data, logSequenceNumber)); } @Test - void assertDecodeWithXid() { + void assertDecodeWithTx() { MppTableData tableData = new MppTableData(); tableData.setTableName("public.test"); tableData.setOpType("INSERT"); @@ -237,7 +238,7 @@ void assertDecodeWithXid() { tableData.setColumnsVal(new String[]{"'7D'"}); List dataList = Arrays.asList("BEGIN 1", JsonUtils.toJsonString(tableData), JsonUtils.toJsonString(tableData), "COMMIT 1 (at 2022-10-27 04:19:39.476261+00) CSN 3468"); - MppdbDecodingPlugin mppdbDecodingPlugin = new MppdbDecodingPlugin(null, true); + MppdbDecodingPlugin mppdbDecodingPlugin = new MppdbDecodingPlugin(null, true, false); List expectedEvent = new LinkedList<>(); for (String each : dataList) { expectedEvent.add(mppdbDecodingPlugin.decode(ByteBuffer.wrap(each.getBytes()), logSequenceNumber)); @@ -245,13 +246,35 @@ void assertDecodeWithXid() { assertThat(expectedEvent.size(), is(4)); AbstractWALEvent actualFirstEvent = expectedEvent.get(0); assertInstanceOf(BeginTXEvent.class, actualFirstEvent); - assertThat(((BeginTXEvent) actualFirstEvent).getXid(), is(1L)); AbstractWALEvent actualLastEvent = expectedEvent.get(expectedEvent.size() - 1); assertInstanceOf(CommitTXEvent.class, actualLastEvent); assertThat(((CommitTXEvent) actualLastEvent).getCsn(), is(3468L)); assertThat(((CommitTXEvent) actualLastEvent).getXid(), is(1L)); } + @Test + void assertParallelDecodeWithTx() { + MppTableData tableData = new MppTableData(); + tableData.setTableName("public.test"); + tableData.setOpType("INSERT"); + tableData.setColumnsName(new String[]{"data"}); + tableData.setColumnsType(new String[]{"raw"}); + tableData.setColumnsVal(new String[]{"'7D'"}); + List dataList = Arrays.asList("BEGIN CSN: 951909 first_lsn: 5/59825858", JsonUtils.toJsonString(tableData), JsonUtils.toJsonString(tableData), "commit xid: 1006076"); + MppdbDecodingPlugin mppdbDecodingPlugin = new MppdbDecodingPlugin(null, true, true); + List actual = new LinkedList<>(); + for (String each : dataList) { + actual.add(mppdbDecodingPlugin.decode(ByteBuffer.wrap(each.getBytes()), logSequenceNumber)); + } + assertThat(actual.size(), is(4)); + assertInstanceOf(BeginTXEvent.class, actual.get(0)); + assertThat(((BeginTXEvent) actual.get(0)).getCsn(), is(951909L)); + assertThat(((WriteRowEvent) actual.get(1)).getAfterRow().get(0).toString(), is("7D")); + assertThat(((WriteRowEvent) actual.get(2)).getAfterRow().get(0).toString(), is("7D")); + assertThat(((CommitTXEvent) actual.get(3)).getXid(), is(1006076L)); + assertNull(((CommitTXEvent) actual.get(3)).getCsn()); + } + @Test void assertDecodeWithTsrange() { MppTableData tableData = new MppTableData(); @@ -261,7 +284,7 @@ void assertDecodeWithTsrange() { tableData.setColumnsType(new String[]{"tsrange"}); tableData.setColumnsVal(new String[]{"'[\"2020-01-01 00:00:00\",\"2021-01-01 00:00:00\")'"}); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); Object byteaObj = actual.getAfterRow().get(0); assertThat(byteaObj, instanceOf(PGobject.class)); assertThat(byteaObj.toString(), is("[\"2020-01-01 00:00:00\",\"2021-01-01 00:00:00\")")); @@ -276,7 +299,7 @@ void assertDecodeWithDaterange() { tableData.setColumnsType(new String[]{"daterange"}); tableData.setColumnsVal(new String[]{"'[2020-01-02,2021-01-02)'"}); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); Object byteaObj = actual.getAfterRow().get(0); assertThat(byteaObj, instanceOf(PGobject.class)); assertThat(byteaObj.toString(), is("[2020-01-02,2021-01-02)")); @@ -291,7 +314,7 @@ void assertDecodeWithTsquery() { tableData.setColumnsType(new String[]{"tsquery"}); tableData.setColumnsVal(new String[]{"'''fff'' | ''faa'''"}); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); Object byteaObj = actual.getAfterRow().get(0); assertThat(byteaObj.toString(), is("'fff' | 'faa'")); } @@ -305,7 +328,7 @@ void assertDecodeWitTinyint() { tableData.setColumnsType(new String[]{"tinyint"}); tableData.setColumnsVal(new String[]{"255"}); ByteBuffer data = ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()); - WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null).decode(data, logSequenceNumber); + WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber); Object byteaObj = actual.getAfterRow().get(0); assertThat(byteaObj, is(255)); } diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java index e02bd7a5d4f07..0f81972886702 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java @@ -53,7 +53,7 @@ public AbstractWALEvent decode(final ByteBuffer data, final BaseLogSequenceNumbe AbstractWALEvent result; String type = readEventType(data); if (type.startsWith("BEGIN")) { - result = new BeginTXEvent(Long.parseLong(readNextSegment(data))); + result = new BeginTXEvent(Long.parseLong(readNextSegment(data)), null); } else if (type.startsWith("COMMIT")) { result = new CommitTXEvent(Long.parseLong(readNextSegment(data)), null); } else { diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java index b608e04e5c58d..825b5725d79d4 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/BeginTXEvent.java @@ -27,5 +27,7 @@ @Getter public final class BeginTXEvent extends AbstractWALEvent { - private final long xid; + private final Long xid; + + private final Long csn; } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index dab22d7bd3d87..af0c4de101415 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -144,7 +144,7 @@ private DataRecord getDataRecord(final WriteRowEvent rowsEvent) throws Reflectiv @Test void assertConvertBeginTXEvent() { - BeginTXEvent beginTXEvent = new BeginTXEvent(100); + BeginTXEvent beginTXEvent = new BeginTXEvent(100L, null); beginTXEvent.setLogSequenceNumber(new PostgreSQLLogSequenceNumber(logSequenceNumber)); Record record = walEventConverter.convert(beginTXEvent); assertInstanceOf(PlaceholderRecord.class, record);