Skip to content

Commit

Permalink
Add PostgreSQLSlotNameGenerator (#32496)
Browse files Browse the repository at this point in the history
* Add PostgreSQLSlotNameGenerator

* Add PostgreSQLSlotNameGenerator
  • Loading branch information
terrymanu authored Aug 13, 2024
1 parent 8faf148 commit 725a2c9
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLSlotNameGenerator;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.opengauss.replication.LogSequenceNumber;
Expand All @@ -40,26 +40,10 @@
@Slf4j
public final class OpenGaussIngestPositionManager implements DialectIngestPositionManager {

private static final String SLOT_NAME_PREFIX = "pipeline";

private static final String DECODE_PLUGIN = "mppdb_decoding";

private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";

/**
* Get the unique slot name by connection.
*
* @param connection connection
* @param slotNameSuffix slot name suffix
* @return the unique name by connection
* @throws SQLException failed when getCatalog
*/
public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
// same as PostgreSQL, but length over 64 will throw an exception directly
String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}

@Override
public WALPosition init(final String data) {
return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(data)));
Expand All @@ -74,7 +58,7 @@ public WALPosition init(final DataSource dataSource, final String slotNameSuffix
}

private void createSlotIfNotExist(final Connection connection, final String slotNameSuffix) throws SQLException {
String slotName = getUniqueSlotName(connection, slotNameSuffix);
String slotName = PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix);
Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, slotName);
if (!slotInfo.isPresent()) {
createSlot(connection, slotName);
Expand Down Expand Up @@ -132,7 +116,7 @@ private WALPosition getWALPosition(final Connection connection) throws SQLExcept
@Override
public void destroy(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
dropSlotIfExist(connection, getUniqueSlotName(connection, slotNameSuffix));
dropSlotIfExist(connection, PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussTimestampUtils;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLSlotNameGenerator;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
Expand Down Expand Up @@ -125,7 +126,7 @@ private void dump() throws SQLException {
int majorVersion = getMajorVersion();
try (PgConnection connection = getReplicationConnectionUnwrap()) {
stream = logicalReplication.createReplicationStream(connection, walPosition.get().getLogSequenceNumber(),
OpenGaussIngestPositionManager.getUniqueSlotName(connection, dumperContext.getJobId()), majorVersion);
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, dumperContext.getJobId()), majorVersion);
DecodingPlugin decodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(connection.getTimestampUtils()), decodeWithTX, majorVersion >= 3);
while (isRunning()) {
ByteBuffer message = stream.readPending();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
Expand All @@ -37,26 +36,10 @@
@Slf4j
public final class PostgreSQLIngestPositionManager implements DialectIngestPositionManager {

private static final String SLOT_NAME_PREFIX = "pipeline";

private static final String DECODE_PLUGIN = "test_decoding";

private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";

/**
* Get the unique slot name by connection.
*
* @param connection the connection
* @param slotNameSuffix slot name suffix
* @return the unique name by connection
* @throws SQLException failed when getCatalog
*/
public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
// PostgreSQL slot name maximum length can't exceed 64,automatic truncation when the length exceeds the limit
String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}

@Override
public WALPosition init(final String data) {
return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(data)));
Expand All @@ -65,7 +48,7 @@ public WALPosition init(final String data) {
@Override
public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix));
createSlotIfNotExist(connection, PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
return getWALPosition(connection);
}
}
Expand Down Expand Up @@ -124,7 +107,7 @@ public void destroy(final DataSource dataSource, final String slotNameSuffix) th
}

private void dropSlotIfExist(final Connection connection, final String slotNameSuffix) throws SQLException {
String slotName = getUniqueSlotName(connection, slotNameSuffix);
String slotName = PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix);
if (!isSlotExisting(connection, slotName)) {
log.info("dropSlotIfExist, slot not exist, slotName={}", slotName);
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.postgresql.ingest;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.codec.digest.DigestUtils;

import java.sql.Connection;
import java.sql.SQLException;

/**
* PostgreSQL slot name generator.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PostgreSQLSlotNameGenerator {

private static final String SLOT_NAME_PREFIX = "pipeline";

/**
* Get unique slot name by connection.
*
* @param connection connection
* @param slotNameSuffix slot name suffix
* @return unique name by connection
* @throws SQLException failed when get catalog
*/
public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void dump() throws SQLException {
// TODO use unified PgConnection
try (
Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig());
PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLIngestPositionManager.getUniqueSlotName(connection, dumperContext.getJobId()),
PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, dumperContext.getJobId()),
walPosition.get().getLogSequenceNumber())) {
PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils());
DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.postgresql.ingest;

import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.SQLException;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class PostgreSQLSlotNameGeneratorTest {

@Test
void assertGetUniqueSlotName() throws SQLException {
Connection connection = mock(Connection.class);
when(connection.getCatalog()).thenReturn("foo_catalog");
assertThat(PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, "foo_slot"), is("pipeline_9a2b4a79ce8b4fca2835b1e947c446eb"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -39,6 +39,8 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.internal.configuration.plugins.Plugins;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
Expand All @@ -59,7 +61,8 @@
import static org.mockito.Mockito.when;

@ExtendWith(AutoMockExtension.class)
@StaticMockSettings(PostgreSQLIngestPositionManager.class)
@StaticMockSettings({PostgreSQLIngestPositionManager.class, PostgreSQLSlotNameGenerator.class})
@MockitoSettings(strictness = Strictness.LENIENT)
class PostgreSQLWALDumperTest {

@Mock
Expand Down Expand Up @@ -131,9 +134,8 @@ void assertStart() throws SQLException, ReflectiveOperationException {
Plugins.getMemberAccessor().set(PostgreSQLWALDumper.class.getDeclaredField("logicalReplication"), walDumper, logicalReplication);
when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection);
when(pgConnection.unwrap(PgConnection.class)).thenReturn(pgConnection);
when(PostgreSQLIngestPositionManager.getUniqueSlotName(eq(pgConnection), anyString())).thenReturn("0101123456");
when(logicalReplication.createReplicationStream(pgConnection, PostgreSQLIngestPositionManager.getUniqueSlotName(pgConnection, ""), position.getLogSequenceNumber()))
.thenReturn(pgReplicationStream);
when(PostgreSQLSlotNameGenerator.getUniqueSlotName(eq(pgConnection), anyString())).thenReturn("0101123456");
when(logicalReplication.createReplicationStream(pgConnection, "0101123456", position.getLogSequenceNumber())).thenReturn(pgReplicationStream);
ByteBuffer data = ByteBuffer.wrap("table public.t_order_0: DELETE: order_id[integer]:1".getBytes());
when(pgReplicationStream.readPending()).thenReturn(null).thenReturn(data).thenThrow(new IngestException(""));
when(pgReplicationStream.getLastReceiveLSN()).thenReturn(LogSequenceNumber.valueOf(101L));
Expand Down

0 comments on commit 725a2c9

Please sign in to comment.