Skip to content

Commit

Permalink
Rename IncrementalDumperConfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 2, 2023
1 parent ee1c67b commit dd52409
Show file tree
Hide file tree
Showing 24 changed files with 77 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import lombok.ToString;

/**
* Dumper configuration.
* Incremental dumper configuration.
*/
@Getter
@Setter
@ToString(callSuper = true)
public class DumperConfiguration extends BaseDumperConfiguration {
public class IncrementalDumperConfiguration extends BaseDumperConfiguration {

private String jobId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -34,13 +34,13 @@ public interface IncrementalDumperCreator extends DatabaseTypedSPI {
/**
* Create incremental dumper.
*
* @param dumperConfig dumper configuration
* @param config incremental dumper configuration
* @param position position
* @param channel channel
* @param metaDataLoader meta data loader
* @return incremental dumper
*/
IncrementalDumper createIncrementalDumper(DumperConfiguration dumperConfig, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);
IncrementalDumper createIncrementalDumper(IncrementalDumperConfiguration config, IngestPosition position, PipelineChannel channel, PipelineTableMetaDataLoader metaDataLoader);

/**
* Whether support incremental dump.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.common.config.ingest;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;

/**
Expand All @@ -31,5 +31,5 @@ public interface IncrementalDumperConfigurationCreator {
* @param jobDataNodeLine job data node line
* @return dumper configuration
*/
DumperConfiguration createDumperConfiguration(JobDataNodeLine jobDataNodeLine);
IncrementalDumperConfiguration createDumperConfiguration(JobDataNodeLine jobDataNodeLine);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
Expand Down Expand Up @@ -123,7 +123,7 @@ public static void prepareTargetTables(final DatabaseType databaseType, final Pr
* @return ingest position
* @throws SQLException sql exception
*/
public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final DumperConfiguration dumperConfig,
public static IngestPosition getIncrementalPosition(final JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperConfiguration dumperConfig,
final PipelineDataSourceManager dataSourceManager) throws SQLException {
if (null != initIncremental) {
Optional<IngestPosition> position = initIncremental.getIncrementalPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.h2.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -30,7 +30,7 @@
public final class H2IncrementalDumperCreator implements IncrementalDumperCreator {

@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
throw new UnsupportedOperationException("H2 database can not support incremental dump.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
Expand Down Expand Up @@ -68,7 +68,7 @@
@Slf4j
public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor implements IncrementalDumper {

private final DumperConfiguration dumperConfig;
private final IncrementalDumperConfiguration dumperConfig;

private final BinlogPosition binlogPosition;

Expand All @@ -80,7 +80,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl

private final String catalog;

public MySQLIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition binlogPosition,
public MySQLIncrementalDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition binlogPosition,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
Preconditions.checkArgument(dumperConfig.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
this.dumperConfig = dumperConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.mysql.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -31,9 +31,9 @@
public final class MySQLIncrementalDumperCreator implements IncrementalDumperCreator {

@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
return new MySQLIncrementalDumper(dumperConfig, position, channel, metaDataLoader);
return new MySQLIncrementalDumper(config, position, channel, metaDataLoader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
Expand Down Expand Up @@ -75,7 +75,7 @@
@SuppressWarnings("unchecked")
class MySQLIncrementalDumperTest {

private DumperConfiguration dumperConfig;
private IncrementalDumperConfiguration dumperConfig;

private MySQLIncrementalDumper incrementalDumper;

Expand All @@ -93,8 +93,8 @@ void setUp() {
when(metaDataLoader.getTableMetaData(any(), any())).thenReturn(pipelineTableMetaData);
}

private DumperConfiguration mockDumperConfiguration() {
DumperConfiguration result = new DumperConfiguration();
private IncrementalDumperConfiguration mockDumperConfiguration() {
IncrementalDumperConfiguration result = new IncrementalDumperConfiguration();
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root"));
result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap()));
Expand All @@ -103,7 +103,7 @@ private DumperConfiguration mockDumperConfiguration() {
}

@SneakyThrows(SQLException.class)
private void initTableData(final DumperConfiguration dumperConfig) {
private void initTableData(final IncrementalDumperConfiguration dumperConfig) {
try (
PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
Expand Down Expand Up @@ -58,7 +58,7 @@
@Slf4j
public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {

private final DumperConfiguration dumperConfig;
private final IncrementalDumperConfiguration dumperConfig;

private final AtomicReference<WALPosition> walPosition;

Expand All @@ -72,7 +72,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen

private List<AbstractRowEvent> rowEvents = new LinkedList<>();

public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public OpenGaussWALDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.opengauss.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -31,9 +31,9 @@
public final class OpenGaussIncrementalDumperCreator implements IncrementalDumperCreator {

@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
return new OpenGaussWALDumper(dumperConfig, position, channel, metaDataLoader);
return new OpenGaussWALDumper(config, position, channel, metaDataLoader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
Expand Down Expand Up @@ -60,7 +60,7 @@
@Slf4j
public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor implements IncrementalDumper {

private final DumperConfiguration dumperConfig;
private final IncrementalDumperConfiguration dumperConfig;

private final AtomicReference<WALPosition> walPosition;

Expand All @@ -74,7 +74,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme

private List<AbstractRowEvent> rowEvents = new LinkedList<>();

public PostgreSQLWALDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public PostgreSQLWALDumper(final IncrementalDumperConfiguration dumperConfig, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.postgresql.ingest.dumper;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
Expand All @@ -31,9 +31,9 @@
public final class PostgreSQLIncrementalDumperCreator implements IncrementalDumperCreator {

@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration dumperConfig, final IngestPosition position,
public IncrementalDumper createIncrementalDumper(final IncrementalDumperConfiguration config, final IngestPosition position,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
return new PostgreSQLWALDumper(dumperConfig, position, channel, metaDataLoader);
return new PostgreSQLWALDumper(config, position, channel, metaDataLoader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;

import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
Expand All @@ -43,11 +43,11 @@
*/
public final class WALEventConverter {

private final DumperConfiguration dumperConfig;
private final IncrementalDumperConfiguration dumperConfig;

private final PipelineTableMetaDataLoader metaDataLoader;

public WALEventConverter(final DumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
public WALEventConverter(final IncrementalDumperConfiguration dumperConfig, final PipelineTableMetaDataLoader metaDataLoader) {
this.dumperConfig = dumperConfig;
this.metaDataLoader = metaDataLoader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;

import org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.IncrementalDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
Expand Down Expand Up @@ -72,7 +72,7 @@ class PostgreSQLWALDumperTest {

private WALPosition position;

private DumperConfiguration dumperConfig;
private IncrementalDumperConfiguration dumperConfig;

private PostgreSQLWALDumper walDumper;

Expand Down Expand Up @@ -103,8 +103,8 @@ private void createTable(final String jdbcUrl, final String username, final Stri
}
}

private DumperConfiguration createDumperConfiguration(final String jdbcUrl, final String username, final String password) {
DumperConfiguration result = new DumperConfiguration();
private IncrementalDumperConfiguration createDumperConfiguration(final String jdbcUrl, final String username, final String password) {
IncrementalDumperConfiguration result = new IncrementalDumperConfiguration();
result.setJobId("0101123456");
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")));
Expand Down
Loading

0 comments on commit dd52409

Please sign in to comment.