Skip to content

Commit

Permalink
Refactor DialectIngestPositionManager (#32495)
Browse files Browse the repository at this point in the history
* Refactor MySQLIngestPositionManager

* Refactor DialectIngestPositionManager

* Refactor DialectIngestPositionManager

* Refactor DialectIngestPositionManager
  • Loading branch information
terrymanu authored Aug 13, 2024
1 parent 2765b64 commit 8faf148
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,22 @@
public interface DialectIngestPositionManager extends DatabaseTypedSPI {

/**
* Init position by data source.
* Init position by string data.
*
* @param dataSource data source
* @param slotNameSuffix slot name suffix
* @param data string data
* @return position
* @throws SQLException SQL exception
*/
IngestPosition init(DataSource dataSource, String slotNameSuffix) throws SQLException;
IngestPosition init(String data);

/**
* Init position by string data.
* Init position by data source.
*
* @param data string data
* @param dataSource data source
* @param slotNameSuffix slot name suffix
* @return position
* @throws SQLException SQL exception
*/
IngestPosition init(String data);
IngestPosition init(DataSource dataSource, String slotNameSuffix) throws SQLException;

/**
* Clean up by data source if necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,27 @@
*/
public final class MySQLIngestPositionManager implements DialectIngestPositionManager {

@Override
public BinlogPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
return getBinlogPosition(connection);
}
}

@Override
public BinlogPosition init(final String data) {
String[] array = data.split("#");
Preconditions.checkArgument(2 == array.length, "Unknown binlog position: %s", data);
return new BinlogPosition(array[0], Long.parseLong(array[1]));
}

@Override
public BinlogPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
return getBinlogPosition(connection);
}
}

private BinlogPosition getBinlogPosition(final Connection connection) throws SQLException {
String filename;
long position;
try (
PreparedStatement preparedStatement = connection.prepareStatement("SHOW MASTER STATUS");
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
filename = resultSet.getString(1);
position = resultSet.getLong(2);
return new BinlogPosition(resultSet.getString(1), resultSet.getLong(2));
}
return new BinlogPosition(filename, position);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.opengauss.replication.LogSequenceNumber;

import javax.sql.DataSource;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Optional;

/**
Expand All @@ -46,36 +46,43 @@ public final class OpenGaussIngestPositionManager implements DialectIngestPositi

private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";

@Override
public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
createSlotIfNotExist(connection, slotNameSuffix);
return getWalPosition(connection);
}
/**
* Get the unique slot name by connection.
*
* @param connection connection
* @param slotNameSuffix slot name suffix
* @return the unique name by connection
* @throws SQLException failed when getCatalog
*/
public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
// same as PostgreSQL, but length over 64 will throw an exception directly
String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}

@Override
public WALPosition init(final String data) {
return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(data)));
}

/**
* Create logical replication slot if it does not exist.
*
* @param connection connection
* @param slotNameSuffix slotName suffix
* @throws SQLException SQL exception
*/
@Override
public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
createSlotIfNotExist(connection, slotNameSuffix);
return getWALPosition(connection);
}
}

private void createSlotIfNotExist(final Connection connection, final String slotNameSuffix) throws SQLException {
String slotName = getUniqueSlotName(connection, slotNameSuffix);
Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, slotName);
if (!slotInfo.isPresent()) {
createSlotBySQL(connection, slotName);
createSlot(connection, slotName);
return;
}
if (null == slotInfo.get().getDatabaseName()) {
dropSlotIfExist(connection, slotName);
createSlotBySQL(connection, slotName);
createSlot(connection, slotName);
}
}

Expand All @@ -85,17 +92,15 @@ private Optional<ReplicationSlotInfo> getSlotInfo(final Connection connection, f
preparedStatement.setString(1, slotName);
preparedStatement.setString(2, DECODE_PLUGIN);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (!resultSet.next()) {
return Optional.empty();
}
return Optional.of(new ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2)));
return resultSet.next() ? Optional.of(new ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) : Optional.empty();
}
}
}

private void createSlotBySQL(final Connection connection, final String slotName) throws SQLException {
String sql = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
private void createSlot(final Connection connection, final String slotName) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT * FROM pg_create_logical_replication_slot(?, ?)")) {
preparedStatement.setString(1, slotName);
preparedStatement.setString(2, DECODE_PLUGIN);
preparedStatement.execute();
} catch (final SQLException ex) {
if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
Expand All @@ -104,10 +109,21 @@ private void createSlotBySQL(final Connection connection, final String slotName)
}
}

private WALPosition getWalPosition(final Connection connection) throws SQLException {
private void dropSlotIfExist(final Connection connection, final String slotName) throws SQLException {
if (!getSlotInfo(connection, slotName).isPresent()) {
log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName);
return;
}
try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT * from pg_drop_replication_slot(?)")) {
preparedStatement.setString(1, slotName);
preparedStatement.execute();
}
}

private WALPosition getWALPosition(final Connection connection) throws SQLException {
try (
PreparedStatement preparedStatement = connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
ResultSet resultSet = preparedStatement.executeQuery()) {
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT PG_CURRENT_XLOG_LOCATION()")) {
resultSet.next();
return new WALPosition(new OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
}
Expand All @@ -120,31 +136,6 @@ public void destroy(final DataSource dataSource, final String slotNameSuffix) th
}
}

private void dropSlotIfExist(final Connection connection, final String slotName) throws SQLException {
if (!getSlotInfo(connection, slotName).isPresent()) {
log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", slotName);
return;
}
String sql = String.format("select * from pg_drop_replication_slot('%s')", slotName);
try (CallableStatement callableStatement = connection.prepareCall(sql)) {
callableStatement.execute();
}
}

/**
* Get the unique slot name by connection.
*
* @param connection connection
* @param slotNameSuffix slot name suffix
* @return the unique name by connection
* @throws SQLException failed when getCatalog
*/
public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
// same as PostgreSQL, but length over 64 will throw an exception directly
String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}

@Override
public String getDatabaseType() {
return "openGauss";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.postgresql.replication.LogSequenceNumber;

import javax.sql.DataSource;
Expand All @@ -43,26 +43,41 @@ public final class PostgreSQLIngestPositionManager implements DialectIngestPosit

private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";

@Override
public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix));
return getWalPosition(connection);
}
/**
* Get the unique slot name by connection.
*
* @param connection the connection
* @param slotNameSuffix slot name suffix
* @return the unique name by connection
* @throws SQLException failed when getCatalog
*/
public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
// PostgreSQL slot name maximum length can't exceed 64,automatic truncation when the length exceeds the limit
String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}

@Override
public WALPosition init(final String data) {
return new WALPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(data)));
}

@Override
public WALPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
createSlotIfNotExist(connection, getUniqueSlotName(connection, slotNameSuffix));
return getWALPosition(connection);
}
}

private void createSlotIfNotExist(final Connection connection, final String slotName) throws SQLException {
if (isSlotExisting(connection, slotName)) {
log.info("createSlotIfNotExist, slot exist, slotName={}", slotName);
return;
}
String createSlotSQL = String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
try (PreparedStatement preparedStatement = connection.prepareStatement(createSlotSQL)) {
try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT * FROM pg_create_logical_replication_slot(?, ?)")) {
preparedStatement.setString(1, slotName);
preparedStatement.setString(2, DECODE_PLUGIN);
preparedStatement.execute();
} catch (final SQLException ex) {
if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
Expand All @@ -82,7 +97,7 @@ private boolean isSlotExisting(final Connection connection, final String slotNam
}
}

private WALPosition getWalPosition(final Connection connection) throws SQLException {
private WALPosition getWALPosition(final Connection connection) throws SQLException {
try (
PreparedStatement preparedStatement = connection.prepareStatement(getLogSequenceNumberSQL(connection));
ResultSet resultSet = preparedStatement.executeQuery()) {
Expand Down Expand Up @@ -122,20 +137,6 @@ private void dropSlotIfExist(final Connection connection, final String slotNameS
}
}

/**
* Get the unique slot name by connection.
*
* @param connection the connection
* @param slotNameSuffix slot name suffix
* @return the unique name by connection
* @throws SQLException failed when getCatalog
*/
public static String getUniqueSlotName(final Connection connection, final String slotNameSuffix) throws SQLException {
// PostgreSQL slot name maximum length can't exceed 64,automatic truncation when the length exceeds the limit
String slotName = DigestUtils.md5Hex(String.join("_", connection.getCatalog(), slotNameSuffix).getBytes());
return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}

@Override
public String getDatabaseType() {
return "PostgreSQL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;

/**
* Base of log sequence number interface.
* Log sequence number.
*/
public interface BaseLogSequenceNumber {

/**
* Convert log sequence number to String.
* Convert log sequence number to string.
*
* @return Long the sequence number of String value
* @return converted string value
*/
String asString();

/**
* Get the binded object.
* Get bound log sequence number.
*
* @return Object the bind log sequence number
* @return bound log sequence number
*/
Object get();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.postgresql.replication.LogSequenceNumber;

/**
* PostgreSQL sequence.
* Log sequence number of PostgreSQL.
*/
@RequiredArgsConstructor
@ToString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ void setUp() throws SQLException {
when(connection.getCatalog()).thenReturn("sharding_db");
when(connection.getMetaData()).thenReturn(databaseMetaData);
PreparedStatement lsn96PreparedStatement = mockPostgreSQL96LSN();
when(connection.prepareStatement(String.format("SELECT * FROM pg_create_logical_replication_slot('%s', '%s')", PostgreSQLIngestPositionManager.getUniqueSlotName(connection, ""),
"test_decoding"))).thenReturn(mock(PreparedStatement.class));
when(connection.prepareStatement("SELECT * FROM pg_create_logical_replication_slot(?, ?)")).thenReturn(mock(PreparedStatement.class));
when(connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()")).thenReturn(lsn96PreparedStatement);
PreparedStatement lsn10PreparedStatement = mockPostgreSQL10LSN();
when(connection.prepareStatement("SELECT PG_CURRENT_WAL_LSN()")).thenReturn(lsn10PreparedStatement);
Expand Down

0 comments on commit 8faf148

Please sign in to comment.