From 8faf1485f5983a5045791c9700aa470747e0ed0f Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Tue, 13 Aug 2024 22:00:01 +0800 Subject: [PATCH] Refactor DialectIngestPositionManager (#32495) * Refactor MySQLIngestPositionManager * Refactor DialectIngestPositionManager * Refactor DialectIngestPositionManager * Refactor DialectIngestPositionManager --- .../DialectIngestPositionManager.java | 16 ++-- .../ingest/MySQLIngestPositionManager.java | 20 ++-- .../OpenGaussIngestPositionManager.java | 95 +++++++++---------- .../PostgreSQLIngestPositionManager.java | 49 +++++----- .../wal/decode/BaseLogSequenceNumber.java | 10 +- .../decode/PostgreSQLLogSequenceNumber.java | 2 +- .../PostgreSQLIngestPositionManagerTest.java | 3 +- 7 files changed, 91 insertions(+), 104 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java index 02210c7ebddaa..657bb8f4ab961 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java @@ -30,22 +30,22 @@ public interface DialectIngestPositionManager extends DatabaseTypedSPI { /** - * Init position by data source. + * Init position by string data. * - * @param dataSource data source - * @param slotNameSuffix slot name suffix + * @param data string data * @return position - * @throws SQLException SQL exception */ - IngestPosition init(DataSource dataSource, String slotNameSuffix) throws SQLException; + IngestPosition init(String data); /** - * Init position by string data. + * Init position by data source. * - * @param data string data + * @param dataSource data source + * @param slotNameSuffix slot name suffix * @return position + * @throws SQLException SQL exception */ - IngestPosition init(String data); + IngestPosition init(DataSource dataSource, String slotNameSuffix) throws SQLException; /** * Clean up by data source if necessary. diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java index cbc3170f5843d..aff2ef03b2641 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java @@ -32,13 +32,6 @@ */ public final class MySQLIngestPositionManager implements DialectIngestPositionManager { - @Override - public BinlogPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException { - try (Connection connection = dataSource.getConnection()) { - return getBinlogPosition(connection); - } - } - @Override public BinlogPosition init(final String data) { String[] array = data.split("#"); @@ -46,17 +39,20 @@ public BinlogPosition init(final String data) { return new BinlogPosition(array[0], Long.parseLong(array[1])); } + @Override + public BinlogPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException { + try (Connection connection = dataSource.getConnection()) { + return getBinlogPosition(connection); + } + } + private BinlogPosition getBinlogPosition(final Connection connection) throws SQLException { - String filename; - long position; try ( PreparedStatement preparedStatement = connection.prepareStatement("SHOW MASTER STATUS"); ResultSet resultSet = preparedStatement.executeQuery()) { resultSet.next(); - filename = resultSet.getString(1); - position = resultSet.getLong(2); + return new BinlogPosition(resultSet.getString(1), resultSet.getLong(2)); } - return new BinlogPosition(filename, position); } @Override diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java index 9b885d432d4af..776932449b7b1 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java @@ -19,18 +19,18 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager; import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition; -import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager; import org.opengauss.replication.LogSequenceNumber; import javax.sql.DataSource; -import java.sql.CallableStatement; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.Optional; /** @@ -46,12 +46,18 @@ public final class OpenGaussIngestPositionManager implements DialectIngestPositi private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710"; - @Override - public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException { - try (Connection connection = dataSource.getConnection()) { - createSlotIfNotExist(connection, slotNameSuffix); - return getWalPosition(connection); - } + /** + * Get the unique slot name by connection. + * + * @param connection connection + * @param slotNameSuffix slot name suffix + * @return the unique name by connection + * @throws SQLException failed when getCatalog + */ + public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException { + // same as PostgreSQL, but length over 64 will throw an exception directly + String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes()); + return String.format("%s_%s", SLOT_NAME_PREFIX, slotName); } @Override @@ -59,23 +65,24 @@ public WALPosition init(final String data) { return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(data))); } - /** - * Create logical replication slot if it does not exist. - * - * @param connection connection - * @param slotNameSuffix slotName suffix - * @throws SQLException SQL exception - */ + @Override + public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException { + try (Connection connection = dataSource.getConnection()) { + createSlotIfNotExist(connection, slotNameSuffix); + return getWALPosition(connection); + } + } + private void createSlotIfNotExist(final Connection connection, final String slotNameSuffix) throws SQLException { String slotName = getUniqueSlotName(connection, slotNameSuffix); Optional slotInfo = getSlotInfo(connection, slotName); if (!slotInfo.isPresent()) { - createSlotBySQL(connection, slotName); + createSlot(connection, slotName); return; } if (null == slotInfo.get().getDatabaseName()) { dropSlotIfExist(connection, slotName); - createSlotBySQL(connection, slotName); + createSlot(connection, slotName); } } @@ -85,17 +92,15 @@ private Optional getSlotInfo(final Connection connection, f preparedStatement.setString(1, slotName); preparedStatement.setString(2, DECODE_PLUGIN); try (ResultSet resultSet = preparedStatement.executeQuery()) { - if (!resultSet.next()) { - return Optional.empty(); - } - return Optional.of(new ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))); + return resultSet.next() ? Optional.of(new ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) : Optional.empty(); } } } - private void createSlotBySQL(final Connection connection, final String slotName) throws SQLException { - String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN); - try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + private void createSlot(final Connection connection, final String slotName) throws SQLException { + try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT * FROM pg_create_logical_replication_slot(?, ?)")) { + preparedStatement.setString(1, slotName); + preparedStatement.setString(2, DECODE_PLUGIN); preparedStatement.execute(); } catch (final SQLException ex) { if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) { @@ -104,10 +109,21 @@ private void createSlotBySQL(final Connection connection, final String slotName) } } - private WALPosition getWalPosition(final Connection connection) throws SQLException { + private void dropSlotIfExist(final Connection connection, final String slotName) throws SQLException { + if (!getSlotInfo(connection, slotName).isPresent()) { + log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName); + return; + } + try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT * from pg_drop_replication_slot(?)")) { + preparedStatement.setString(1, slotName); + preparedStatement.execute(); + } + } + + private WALPosition getWALPosition(final Connection connection) throws SQLException { try ( - PreparedStatement preparedStatement = connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()"); - ResultSet resultSet = preparedStatement.executeQuery()) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery("SELECT PG_CURRENT_XLOG_LOCATION()")) { resultSet.next(); return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1)))); } @@ -120,31 +136,6 @@ public void destroy(final DataSource dataSource, final String slotNameSuffix) th } } - private void dropSlotIfExist(final Connection connection, final String slotName) throws SQLException { - if (!getSlotInfo(connection, slotName).isPresent()) { - log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName); - return; - } - String sql = String.format("select * from pg_drop_replication_slot('%s')", slotName); - try (CallableStatement callableStatement = connection.prepareCall(sql)) { - callableStatement.execute(); - } - } - - /** - * Get the unique slot name by connection. - * - * @param connection connection - * @param slotNameSuffix slot name suffix - * @return the unique name by connection - * @throws SQLException failed when getCatalog - */ - public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException { - // same as PostgreSQL, but length over 64 will throw an exception directly - String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes()); - return String.format("%s_%s", SLOT_NAME_PREFIX, slotName); - } - @Override public String getDatabaseType() { return "openGauss"; diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java index d3db37369bd22..389ba0ace3064 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java @@ -20,9 +20,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition; import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber; -import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager; import org.postgresql.replication.LogSequenceNumber; import javax.sql.DataSource; @@ -43,12 +43,18 @@ public final class PostgreSQLIngestPositionManager implements DialectIngestPosit private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710"; - @Override - public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException { - try (Connection connection = dataSource.getConnection()) { - createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix)); - return getWalPosition(connection); - } + /** + * Get the unique slot name by connection. + * + * @param connection the connection + * @param slotNameSuffix slot name suffix + * @return the unique name by connection + * @throws SQLException failed when getCatalog + */ + public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException { + // PostgreSQL slot name maximum length can't exceed 64,automatic truncation when the length exceeds the limit + String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes()); + return String.format("%s_%s", SLOT_NAME_PREFIX, slotName); } @Override @@ -56,13 +62,22 @@ public WALPosition init(final String data) { return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(data))); } + @Override + public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException { + try (Connection connection = dataSource.getConnection()) { + createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix)); + return getWALPosition(connection); + } + } + private void createSlotIfNotExist(final Connection connection, final String slotName) throws SQLException { if (isSlotExisting(connection, slotName)) { log.info("createSlotIfNotExist, slot exist, slotName={}", slotName); return; } - String createSlotSQL = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN); - try (PreparedStatement preparedStatement = connection.prepareStatement(createSlotSQL)) { + try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT * FROM pg_create_logical_replication_slot(?, ?)")) { + preparedStatement.setString(1, slotName); + preparedStatement.setString(2, DECODE_PLUGIN); preparedStatement.execute(); } catch (final SQLException ex) { if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) { @@ -82,7 +97,7 @@ private boolean isSlotExisting(final Connection connection, final String slotNam } } - private WALPosition getWalPosition(final Connection connection) throws SQLException { + private WALPosition getWALPosition(final Connection connection) throws SQLException { try ( PreparedStatement preparedStatement = connection.prepareStatement(getLogSequenceNumberSQL(connection)); ResultSet resultSet = preparedStatement.executeQuery()) { @@ -122,20 +137,6 @@ private void dropSlotIfExist(final Connection connection, final String slotNameS } } - /** - * Get the unique slot name by connection. - * - * @param connection the connection - * @param slotNameSuffix slot name suffix - * @return the unique name by connection - * @throws SQLException failed when getCatalog - */ - public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException { - // PostgreSQL slot name maximum length can't exceed 64,automatic truncation when the length exceeds the limit - String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes()); - return String.format("%s_%s", SLOT_NAME_PREFIX, slotName); - } - @Override public String getDatabaseType() { return "PostgreSQL"; diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java index 5a92412bc967c..81409bd19657f 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java @@ -18,21 +18,21 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode; /** - * Base of log sequence number interface. + * Log sequence number. */ public interface BaseLogSequenceNumber { /** - * Convert log sequence number to String. + * Convert log sequence number to string. * - * @return Long the sequence number of String value + * @return converted string value */ String asString(); /** - * Get the binded object. + * Get bound log sequence number. * - * @return Object the bind log sequence number + * @return bound log sequence number */ Object get(); } diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java index 2b15f5d09da1a..5392ec97454e2 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java @@ -22,7 +22,7 @@ import org.postgresql.replication.LogSequenceNumber; /** - * PostgreSQL sequence. + * Log sequence number of PostgreSQL. */ @RequiredArgsConstructor @ToString diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java index b7e7122b7d853..045aa5b3025b6 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java @@ -65,8 +65,7 @@ void setUp() throws SQLException { when(connection.getCatalog()).thenReturn("sharding_db"); when(connection.getMetaData()).thenReturn(databaseMetaData); PreparedStatement lsn96PreparedStatement = mockPostgreSQL96LSN(); - when(connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", PostgreSQLIngestPositionManager.getUniqueSlotName(connection, ""), - "test_decoding"))).thenReturn(mock(PreparedStatement.class)); + when(connection.prepareStatement("SELECT * FROM pg_create_logical_replication_slot(?, ?)")).thenReturn(mock(PreparedStatement.class)); when(connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()")).thenReturn(lsn96PreparedStatement); PreparedStatement lsn10PreparedStatement = mockPostgreSQL10LSN(); when(connection.prepareStatement("SELECT PG_CURRENT_WAL_LSN()")).thenReturn(lsn10PreparedStatement);