diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java index 776932449b7b1..8dc1282d05a4d 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java @@ -18,9 +18,9 @@ package org.apache.shardingsphere.data.pipeline.opengauss.ingest; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager; import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber; +import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLSlotNameGenerator; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition; import org.opengauss.replication.LogSequenceNumber; @@ -40,26 +40,10 @@ @Slf4j public final class OpenGaussIngestPositionManager implements DialectIngestPositionManager { - private static final String SLOT_NAME_PREFIX = "pipeline"; - private static final String DECODE_PLUGIN = "mppdb_decoding"; private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710"; - /** - * Get the unique slot name by connection. - * - * @param connection connection - * @param slotNameSuffix slot name suffix - * @return the unique name by connection - * @throws SQLException failed when getCatalog - */ - public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException { - // same as PostgreSQL, but length over 64 will throw an exception directly - String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes()); - return String.format("%s_%s", SLOT_NAME_PREFIX, slotName); - } - @Override public WALPosition init(final String data) { return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(data))); @@ -74,7 +58,7 @@ public WALPosition init(final DataSource dataSource, final String slotNameSuffix } private void createSlotIfNotExist(final Connection connection, final String slotNameSuffix) throws SQLException { - String slotName = getUniqueSlotName(connection, slotNameSuffix); + String slotName = PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix); Optional slotInfo = getSlotInfo(connection, slotName); if (!slotInfo.isPresent()) { createSlot(connection, slotName); @@ -132,7 +116,7 @@ private WALPosition getWALPosition(final Connection connection) throws SQLExcept @Override public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException { try (Connection connection = dataSource.getConnection()) { - dropSlotIfExist(connection, getUniqueSlotName(connection, slotNameSuffix)); + dropSlotIfExist(connection, PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix)); } } diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java index b50fcbc6eeede..dd142ff827802 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java @@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin; import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber; import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussTimestampUtils; +import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLSlotNameGenerator; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin; @@ -125,7 +126,7 @@ private void dump() throws SQLException { int majorVersion = getMajorVersion(); try (PgConnection connection = getReplicationConnectionUnwrap()) { stream = logicalReplication.createReplicationStream(connection, walPosition.get().getLogSequenceNumber(), - OpenGaussIngestPositionManager.getUniqueSlotName(connection, dumperContext.getJobId()), majorVersion); + PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, dumperContext.getJobId()), majorVersion); DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX, majorVersion >= 3); while (isRunning()) { ByteBuffer message = stream.readPending(); diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java index 389ba0ace3064..e3d23caea2d9a 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException; import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition; @@ -37,26 +36,10 @@ @Slf4j public final class PostgreSQLIngestPositionManager implements DialectIngestPositionManager { - private static final String SLOT_NAME_PREFIX = "pipeline"; - private static final String DECODE_PLUGIN = "test_decoding"; private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710"; - /** - * Get the unique slot name by connection. - * - * @param connection the connection - * @param slotNameSuffix slot name suffix - * @return the unique name by connection - * @throws SQLException failed when getCatalog - */ - public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException { - // PostgreSQL slot name maximum length can't exceed 64,automatic truncation when the length exceeds the limit - String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes()); - return String.format("%s_%s", SLOT_NAME_PREFIX, slotName); - } - @Override public WALPosition init(final String data) { return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(data))); @@ -65,7 +48,7 @@ public WALPosition init(final String data) { @Override public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException { try (Connection connection = dataSource.getConnection()) { - createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix)); + createSlotIfNotExist(connection, PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix)); return getWALPosition(connection); } } @@ -124,7 +107,7 @@ public void destroy(final DataSource dataSource, final String slotNameSuffix) th } private void dropSlotIfExist(final Connection connection, final String slotNameSuffix) throws SQLException { - String slotName = getUniqueSlotName(connection, slotNameSuffix); + String slotName = PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix); if (!isSlotExisting(connection, slotName)) { log.info("dropSlotIfExist, slot not exist, slotName={}", slotName); return; diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGenerator.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGenerator.java new file mode 100644 index 0000000000000..ab84f236d69b7 --- /dev/null +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGenerator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.postgresql.ingest; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.commons.codec.digest.DigestUtils; + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * PostgreSQL slot name generator. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class PostgreSQLSlotNameGenerator { + + private static final String SLOT_NAME_PREFIX = "pipeline"; + + /** + * Get unique slot name by connection. + * + * @param connection connection + * @param slotNameSuffix slot name suffix + * @return unique name by connection + * @throws SQLException failed when get catalog + */ + public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException { + String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes()); + return String.format("%s_%s", SLOT_NAME_PREFIX, slotName); + } +} diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java index 2d1c5d9dff6bc..5674040f903b6 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java @@ -114,7 +114,7 @@ private void dump() throws SQLException { // TODO use unified PgConnection try ( Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()); - PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLIngestPositionManager.getUniqueSlotName(connection, dumperContext.getJobId()), + PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, dumperContext.getJobId()), walPosition.get().getLogSequenceNumber())) { PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils()); DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils); diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGeneratorTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGeneratorTest.java new file mode 100644 index 0000000000000..7db70a30e6f69 --- /dev/null +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGeneratorTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.postgresql.ingest; + +import org.junit.jupiter.api.Test; + +import java.sql.Connection; +import java.sql.SQLException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class PostgreSQLSlotNameGeneratorTest { + + @Test + void assertGetUniqueSlotName() throws SQLException { + Connection connection = mock(Connection.class); + when(connection.getCatalog()).thenReturn("foo_catalog"); + assertThat(PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, "foo_slot"), is("pipeline_9a2b4a79ce8b4fca2835b1e947c446eb")); + } +} diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index ec903e7e922df..9187cb58d9fe0 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -25,12 +25,12 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException; -import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber; +import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException; +import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; import org.apache.shardingsphere.test.mock.AutoMockExtension; import org.apache.shardingsphere.test.mock.StaticMockSettings; import org.junit.jupiter.api.AfterEach; @@ -39,6 +39,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.internal.configuration.plugins.Plugins; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; import org.postgresql.jdbc.PgConnection; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; @@ -59,7 +61,8 @@ import static org.mockito.Mockito.when; @ExtendWith(AutoMockExtension.class) -@StaticMockSettings(PostgreSQLIngestPositionManager.class) +@StaticMockSettings({PostgreSQLIngestPositionManager.class, PostgreSQLSlotNameGenerator.class}) +@MockitoSettings(strictness = Strictness.LENIENT) class PostgreSQLWALDumperTest { @Mock @@ -131,9 +134,8 @@ void assertStart() throws SQLException, ReflectiveOperationException { Plugins.getMemberAccessor().set(PostgreSQLWALDumper.class.getDeclaredField("logicalReplication"), walDumper, logicalReplication); when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection); when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection); - when(PostgreSQLIngestPositionManager.getUniqueSlotName(eq(pgConnection), anyString())).thenReturn("0101123456"); - when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLIngestPositionManager.getUniqueSlotName(pgConnection, ""), position.getLogSequenceNumber())) - .thenReturn(pgReplicationStream); + when(PostgreSQLSlotNameGenerator.getUniqueSlotName(eq(pgConnection), anyString())).thenReturn("0101123456"); + when(logicalReplication.createReplicationStream(pgConnection, "0101123456", position.getLogSequenceNumber())).thenReturn(pgReplicationStream); ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE: order_id[integer]:1".getBytes()); when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new IngestException("")); when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));