Skip to content

Commit

Permalink
Rename DumperCommonContext
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 3, 2023
1 parent 7026f23 commit b82f734
Show file tree
Hide file tree
Showing 24 changed files with 127 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
public abstract class DumperCommonContext {
public final class DumperCommonContext {

private String dataSourceName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
package org.apache.shardingsphere.data.pipeline.api.context.ingest;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
* Incremental dumper context.
*/
@RequiredArgsConstructor
@Getter
@Setter
@ToString(callSuper = true)
public class IncrementalDumperContext extends DumperCommonContext {
@ToString
public class IncrementalDumperContext {

private final DumperCommonContext commonContext;

private String jobId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.api.context.ingest;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
Expand All @@ -28,10 +29,13 @@
/**
* Inventory dumper context.
*/
@RequiredArgsConstructor
@Getter
@Setter
@ToString(callSuper = true)
public final class InventoryDumperContext extends DumperCommonContext {
@ToString
public final class InventoryDumperContext {

private final DumperCommonContext commonContext;

private String actualTableName;

Expand All @@ -51,14 +55,6 @@ public final class InventoryDumperContext extends DumperCommonContext {

private JobRateLimitAlgorithm rateLimitAlgorithm;

public InventoryDumperContext(final DumperCommonContext dumperContext) {
setDataSourceName(dumperContext.getDataSourceName());
setDataSourceConfig(dumperContext.getDataSourceConfig());
setTableNameMap(dumperContext.getTableNameMap());
setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping());
setTargetTableColumnsMap(dumperContext.getTargetTableColumnsMap());
}

/**
* Has unique key or not.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,21 @@ public InventoryDumper(final InventoryDumperContext dumperContext, final Pipelin
this.dumperContext = dumperContext;
this.channel = channel;
this.dataSource = dataSource;
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
this.metaDataLoader = metaDataLoader;
}

@Override
protected void runBlocking() {
IngestPosition position = dumperContext.getPosition();
IngestPosition position = dumperContext.getCommonContext().getPosition();
if (position instanceof FinishedPosition) {
log.info("Ignored because of already finished.");
return;
}
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName());
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(
dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName());
try (Connection connection = dataSource.getConnection()) {
dump(tableMetaData, connection);
} catch (final SQLException ex) {
Expand All @@ -114,7 +115,7 @@ protected void runBlocking() {
@SuppressWarnings("MagicConstant")
private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException {
int batchSize = dumperContext.getBatchSize();
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
Expand Down Expand Up @@ -157,13 +158,13 @@ private String buildInventoryDumpSQL() {
return dumperContext.getQuerySQL();
}
LogicTableName logicTableName = new LogicTableName(dumperContext.getLogicTableName());
String schemaName = dumperContext.getSchemaName(logicTableName);
String schemaName = dumperContext.getCommonContext().getSchemaName(logicTableName);
if (!dumperContext.hasUniqueKey()) {
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName());
}
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition();
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
Collection<String> columnNames = dumperContext.getColumnNames(logicTableName);
Collection<String> columnNames = dumperContext.getCommonContext().getColumnNames(logicTableName);
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
Expand All @@ -180,7 +181,7 @@ private void setParameters(final PreparedStatement preparedStatement) throws SQL
return;
}
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition();
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) {
preparedStatement.setObject(1, position.getBeginValue());
preparedStatement.setObject(2, position.getEndValue());
Expand Down Expand Up @@ -213,7 +214,8 @@ private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMeta

private IngestPosition newPosition(final ResultSet resultSet) throws SQLException {
return dumperContext.hasUniqueKey()
? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperContext.getPosition()).getEndValue())
? PrimaryKeyPositionFactory.newInstance(
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition()).getEndValue())
: new PlaceholderPosition();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public final class InventoryRecordsCountCalculator {
* @throws SplitPipelineJobByUniqueKeyException if there's exception from database
*/
public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) {
String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String schemaName = dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String actualTableName = dumperContext.getActualTableName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(dataSource.getDatabaseType());
Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public List<InventoryTask> splitInventoryData(final InventoryIncrementalJobItemC
long startTimeMillis = System.currentTimeMillis();
InventoryIncrementalProcessContext processContext = jobItemContext.getJobProcessContext();
for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) {
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getPosition());
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position);
Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader());
Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), jobItemContext);
Expand All @@ -111,12 +111,12 @@ public Collection<InventoryDumperContext> splitInventoryDumperContext(final Inve

private Collection<InventoryDumperContext> splitByTable(final InventoryDumperContext dumperContext) {
Collection<InventoryDumperContext> result = new LinkedList<>();
dumperContext.getTableNameMap().forEach((key, value) -> {
InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext);
dumperContext.getCommonContext().getTableNameMap().forEach((key, value) -> {
InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
inventoryDumperContext.getCommonContext().setPosition(new PlaceholderPosition());
// use original table name, for metadata loader, since some database table name case-sensitive
inventoryDumperContext.setActualTableName(key.getOriginal());
inventoryDumperContext.setLogicTableName(value.getOriginal());
inventoryDumperContext.setPosition(new PlaceholderPosition());
inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
result.add(inventoryDumperContext);
Expand All @@ -127,7 +127,7 @@ private Collection<InventoryDumperContext> splitByTable(final InventoryDumperCon
private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext,
final PipelineDataSourceWrapper dataSource) {
if (null == dumperContext.getUniqueKeyColumns()) {
String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String schemaName = dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String actualTableName = dumperContext.getActualTableName();
List<PipelineColumnMetaData> uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader());
dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
Expand All @@ -140,8 +140,8 @@ private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDump
Collection<IngestPosition> inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource);
int i = 0;
for (IngestPosition each : inventoryPositions) {
InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext);
splitDumperContext.setPosition(each);
InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
splitDumperContext.getCommonContext().setPosition(each);
splitDumperContext.setShardingItem(i++);
splitDumperContext.setActualTableName(dumperContext.getActualTableName());
splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
Expand Down Expand Up @@ -205,7 +205,7 @@ private Range<Long> getUniqueKeyValuesRange(final InventoryIncrementalJobItemCon
String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey);
dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public static IngestPosition getIncrementalPosition(final JobItemIncrementalTask
return position.get();
}
}
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class PipelineTaskUtils {
* @return inventory task id
*/
public static String generateInventoryTaskId(final InventoryDumperContext inventoryDumperContext) {
String result = String.format("%s.%s", inventoryDumperContext.getDataSourceName(), inventoryDumperContext.getActualTableName());
String result = String.format("%s.%s", inventoryDumperContext.getCommonContext().getDataSourceName(), inventoryDumperContext.getActualTableName());
return result + "#" + inventoryDumperContext.getShardingItem();
}

Expand Down
Loading

0 comments on commit b82f734

Please sign in to comment.