Skip to content

Commit

Permalink
Rename DialectIncrementalPositionManager (apache#32562)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Aug 16, 2024
1 parent 907e140 commit 59455de
Show file tree
Hide file tree
Showing 16 changed files with 35 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import java.sql.SQLException;

/**
* Dialect ingest position manager.
* Dialect incremental position manager.
*/
@SingletonSPI
public interface DialectIngestPositionManager extends DatabaseTypedSPI {
public interface DialectIncrementalPositionManager extends DatabaseTypedSPI {

/**
* Init position by string data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlJobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
Expand Down Expand Up @@ -62,7 +62,7 @@ public JobItemIncrementalTasksProgress swapToObject(final String databaseType, f
return new JobItemIncrementalTasksProgress(null);
}
// TODO consider to remove parameter databaseType
DialectIngestPositionManager positionInitializer = DatabaseTypedSPILoader.getService(DialectIngestPositionManager.class, TypedSPILoader.getService(DatabaseType.class, databaseType));
DialectIncrementalPositionManager positionInitializer = DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, TypedSPILoader.getService(DatabaseType.class, databaseType));
IncrementalTaskProgress taskProgress = new IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition()));
taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
return new JobItemIncrementalTasksProgress(taskProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
Expand All @@ -44,11 +44,11 @@ public final class IncrementalTaskPositionManager {

private final DatabaseType databaseType;

private final DialectIngestPositionManager positionInitializer;
private final DialectIncrementalPositionManager positionInitializer;

public IncrementalTaskPositionManager(final DatabaseType databaseType) {
this.databaseType = databaseType;
positionInitializer = DatabaseTypedSPILoader.getService(DialectIngestPositionManager.class, databaseType);
positionInitializer = DatabaseTypedSPILoader.getService(DialectIncrementalPositionManager.class, databaseType);
}

/**
Expand Down Expand Up @@ -90,7 +90,7 @@ public void destroyPosition(final String jobId, final PipelineDataSourceConfigur
}

private void destroyPosition(final String jobId,
final ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final DialectIngestPositionManager positionInitializer) throws SQLException {
final ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final DialectIncrementalPositionManager positionInitializer) throws SQLException {
for (DataSourcePoolProperties each : new YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values()) {
try (PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
positionInitializer.destroy(dataSource, jobId);
Expand All @@ -99,7 +99,7 @@ private void destroyPosition(final String jobId,
}

private void destroyPosition(final String jobId, final StandardPipelineDataSourceConfiguration pipelineDataSourceConfig,
final DialectIngestPositionManager positionInitializer) throws SQLException {
final DialectIncrementalPositionManager positionInitializer) throws SQLException {
try (
PipelineDataSourceWrapper dataSource = new PipelineDataSourceWrapper(
DataSourcePoolCreator.create((DataSourcePoolProperties) pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import javax.sql.DataSource;

public final class FixtureIngestPositionManager implements DialectIngestPositionManager {
public final class FixtureIncrementalPositionManager implements DialectIncrementalPositionManager {

@Override
public IngestPlaceholderPosition init(final DataSource dataSource, final String slotNameSuffix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# limitations under the License.
#

org.apache.shardingsphere.data.pipeline.core.ingest.position.FixtureIngestPositionManager
org.apache.shardingsphere.data.pipeline.core.ingest.position.FixtureIncrementalPositionManager
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.base.Preconditions;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;

import javax.sql.DataSource;
import java.sql.Connection;
Expand All @@ -28,9 +28,9 @@
import java.sql.SQLException;

/**
* Ingest position manager for MySQL.
* Incremental position manager for MySQL.
*/
public final class MySQLIngestPositionManager implements DialectIngestPositionManager {
public final class MySQLIncrementalPositionManager implements DialectIncrementalPositionManager {

@Override
public MySQLBinlogPosition init(final String data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# limitations under the License.
#

org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.position.MySQLIngestPositionManager
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.position.MySQLIncrementalPositionManager
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest.position;

import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogPosition;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.position.MySQLIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.position.MySQLIncrementalPositionManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -58,7 +58,7 @@ void setUp() throws SQLException {

@Test
void assertGetCurrentPosition() throws SQLException {
MySQLIngestPositionManager positionInitializer = new MySQLIngestPositionManager();
MySQLIncrementalPositionManager positionInitializer = new MySQLIncrementalPositionManager();
MySQLBinlogPosition actual = positionInitializer.init(dataSource, "");
assertThat(actual.getFilename(), is(LOG_FILE_NAME));
assertThat(actual.getPosition(), is(LOG_POSITION));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.opengauss.ingest.incremental.position;

import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
import org.apache.shardingsphere.data.pipeline.opengauss.ingest.incremental.wal.decode.OpenGaussLogSequenceNumber;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.position.slot.PostgreSQLSlotManager;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.WALPosition;
Expand All @@ -30,9 +30,9 @@
import java.sql.SQLException;

/**
* Ingest position manager for openGauss.
* Incremental position manager for openGauss.
*/
public final class OpenGaussIngestPositionManager implements DialectIngestPositionManager {
public final class OpenGaussIncrementalPositionManager implements DialectIncrementalPositionManager {

private final PostgreSQLSlotManager slotManager = new PostgreSQLSlotManager("mppdb_decoding");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# limitations under the License.
#

org.apache.shardingsphere.data.pipeline.opengauss.ingest.incremental.position.OpenGaussIngestPositionManager
org.apache.shardingsphere.data.pipeline.opengauss.ingest.incremental.position.OpenGaussIncrementalPositionManager
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.position;

import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.position.slot.PostgreSQLSlotManager;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.WALPosition;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.decode.PostgreSQLLogSequenceNumber;
Expand All @@ -32,9 +32,9 @@
import java.sql.SQLException;

/**
* Ingest position manager for PostgreSQL.
* Incremental position manager for PostgreSQL.
*/
public final class PostgreSQLIngestPositionManager implements DialectIngestPositionManager {
public final class PostgreSQLIncrementalPositionManager implements DialectIncrementalPositionManager {

private final PostgreSQLSlotManager slotManager = new PostgreSQLSlotManager("test_decoding");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# limitations under the License.
#

org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.position.PostgreSQLIngestPositionManager
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.position.PostgreSQLIncrementalPositionManager
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.position.PostgreSQLIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.position.PostgreSQLIncrementalPositionManager;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.position.slot.PostgreSQLSlotNameGenerator;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.PostgreSQLLogicalReplication;
import org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.WALPosition;
Expand Down Expand Up @@ -63,7 +63,7 @@
import static org.mockito.Mockito.when;

@ExtendWith(AutoMockExtension.class)
@StaticMockSettings({PostgreSQLIngestPositionManager.class, PostgreSQLSlotNameGenerator.class})
@StaticMockSettings({PostgreSQLIncrementalPositionManager.class, PostgreSQLSlotNameGenerator.class})
@MockitoSettings(strictness = Strictness.LENIENT)
class PostgreSQLIncrementalDumperTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ void assertGetCurrentPositionOnPostgreSQL96() throws SQLException {
mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(6);
WALPosition actual = new PostgreSQLIngestPositionManager().init(dataSource, "");
WALPosition actual = new PostgreSQLIncrementalPositionManager().init(dataSource, "");
assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_96_LSN)));
}

@Test
void assertGetCurrentPositionOnPostgreSQL10() throws SQLException {
mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(10);
WALPosition actual = new PostgreSQLIngestPositionManager().init(dataSource, "");
WALPosition actual = new PostgreSQLIncrementalPositionManager().init(dataSource, "");
assertThat(actual.getLogSequenceNumber().get(), is(LogSequenceNumber.valueOf(POSTGRESQL_10_LSN)));
}

Expand All @@ -93,7 +93,7 @@ void assertGetCurrentPositionThrowException() throws SQLException {
mockSlotExistsOrNot(false);
when(databaseMetaData.getDatabaseMajorVersion()).thenReturn(9);
when(databaseMetaData.getDatabaseMinorVersion()).thenReturn(4);
assertThrows(RuntimeException.class, () -> new PostgreSQLIngestPositionManager().init(dataSource, ""));
assertThrows(RuntimeException.class, () -> new PostgreSQLIncrementalPositionManager().init(dataSource, ""));
}

@SneakyThrows(SQLException.class)
Expand Down Expand Up @@ -130,7 +130,7 @@ void assertDestroyWhenSlotExists() throws SQLException {
mockSlotExistsOrNot(true);
PreparedStatement preparedStatement = mock(PreparedStatement.class);
when(connection.prepareStatement("SELECT pg_drop_replication_slot(?)")).thenReturn(preparedStatement);
new PostgreSQLIngestPositionManager().destroy(dataSource, "");
new PostgreSQLIncrementalPositionManager().destroy(dataSource, "");
verify(preparedStatement).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.ingest.position;

import org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;

import javax.sql.DataSource;

/**
* Ingest position manager for H2.
* Incremental position manager for H2.
*/
public final class H2IngestPositionManager implements DialectIngestPositionManager {
public final class H2IncrementalPositionManager implements DialectIncrementalPositionManager {

@Override
public IngestPlaceholderPosition init(final DataSource dataSource, final String slotNameSuffix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
# limitations under the License.
#

org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.ingest.position.H2IngestPositionManager
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.ingest.position.H2IncrementalPositionManager

0 comments on commit 59455de

Please sign in to comment.