Skip to content

Commit

Permalink
Rename DumperCommonContext
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 3, 2023
1 parent a47c1bc commit b04cb22
Show file tree
Hide file tree
Showing 26 changed files with 144 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@Getter
@Setter
@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
public abstract class DumperCommonContext {
public final class DumperCommonContext {

private String dataSourceName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
package org.apache.shardingsphere.data.pipeline.api.context.ingest;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;

/**
* Incremental dumper context.
*/
@RequiredArgsConstructor
@Getter
@Setter
@ToString(callSuper = true)
public final class IncrementalDumperContext extends DumperCommonContext {
@ToString
public final class IncrementalDumperContext {

private final DumperCommonContext commonContext;

private String jobId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
*/
@Getter
@Setter
@ToString(callSuper = true)
public final class InventoryDumperContext extends DumperCommonContext {
@ToString
public final class InventoryDumperContext {

private final DumperCommonContext commonContext;

private String actualTableName;

Expand All @@ -51,12 +53,9 @@ public final class InventoryDumperContext extends DumperCommonContext {

private JobRateLimitAlgorithm rateLimitAlgorithm;

public InventoryDumperContext(final DumperCommonContext dumperContext) {
setDataSourceName(dumperContext.getDataSourceName());
setDataSourceConfig(dumperContext.getDataSourceConfig());
setTableNameMap(dumperContext.getTableNameMap());
setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping());
setTargetTableColumnsMap(dumperContext.getTargetTableColumnsMap());
public InventoryDumperContext(final DumperCommonContext commonContext) {
this.commonContext = commonContext;
this.commonContext.setPosition(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,21 @@ public InventoryDumper(final InventoryDumperContext dumperContext, final Pipelin
this.dumperContext = dumperContext;
this.channel = channel;
this.dataSource = dataSource;
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
this.metaDataLoader = metaDataLoader;
}

@Override
protected void runBlocking() {
IngestPosition position = dumperContext.getPosition();
IngestPosition position = dumperContext.getCommonContext().getPosition();
if (position instanceof FinishedPosition) {
log.info("Ignored because of already finished.");
return;
}
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName());
PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(
dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName());
try (Connection connection = dataSource.getConnection()) {
dump(tableMetaData, connection);
} catch (final SQLException ex) {
Expand All @@ -114,7 +115,7 @@ protected void runBlocking() {
@SuppressWarnings("MagicConstant")
private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException {
int batchSize = dumperContext.getBatchSize();
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
Expand Down Expand Up @@ -157,13 +158,13 @@ private String buildInventoryDumpSQL() {
return dumperContext.getQuerySQL();
}
LogicTableName logicTableName = new LogicTableName(dumperContext.getLogicTableName());
String schemaName = dumperContext.getSchemaName(logicTableName);
String schemaName = dumperContext.getCommonContext().getSchemaName(logicTableName);
if (!dumperContext.hasUniqueKey()) {
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName());
}
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition();
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
Collection<String> columnNames = dumperContext.getColumnNames(logicTableName);
Collection<String> columnNames = dumperContext.getCommonContext().getColumnNames(logicTableName);
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName());
Expand All @@ -180,7 +181,7 @@ private void setParameters(final PreparedStatement preparedStatement) throws SQL
return;
}
PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0);
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperContext.getPosition();
PrimaryKeyPosition<?> position = (PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition();
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) {
preparedStatement.setObject(1, position.getBeginValue());
preparedStatement.setObject(2, position.getEndValue());
Expand Down Expand Up @@ -213,7 +214,8 @@ private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMeta

private IngestPosition newPosition(final ResultSet resultSet) throws SQLException {
return dumperContext.hasUniqueKey()
? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperContext.getPosition()).getEndValue())
? PrimaryKeyPositionFactory.newInstance(
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition<?>) dumperContext.getCommonContext().getPosition()).getEndValue())
: new PlaceholderPosition();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public final class InventoryRecordsCountCalculator {
* @throws SplitPipelineJobByUniqueKeyException if there's exception from database
*/
public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) {
String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String schemaName = dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String actualTableName = dumperContext.getActualTableName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(dataSource.getDatabaseType());
Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public List<InventoryTask> splitInventoryData(final InventoryIncrementalJobItemC
long startTimeMillis = System.currentTimeMillis();
InventoryIncrementalProcessContext processContext = jobItemContext.getJobProcessContext();
for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) {
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getPosition());
AtomicReference<IngestPosition> position = new AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position);
Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader());
Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), jobItemContext);
Expand All @@ -111,12 +111,12 @@ public Collection<InventoryDumperContext> splitInventoryDumperContext(final Inve

private Collection<InventoryDumperContext> splitByTable(final InventoryDumperContext dumperContext) {
Collection<InventoryDumperContext> result = new LinkedList<>();
dumperContext.getTableNameMap().forEach((key, value) -> {
InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext);
dumperContext.getCommonContext().getTableNameMap().forEach((key, value) -> {
InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
// use original table name, for metadata loader, since some database table name case-sensitive
inventoryDumperContext.setActualTableName(key.getOriginal());
inventoryDumperContext.setLogicTableName(value.getOriginal());
inventoryDumperContext.setPosition(new PlaceholderPosition());
inventoryDumperContext.getCommonContext().setPosition(new PlaceholderPosition());
inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames());
inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns());
result.add(inventoryDumperContext);
Expand All @@ -127,7 +127,7 @@ private Collection<InventoryDumperContext> splitByTable(final InventoryDumperCon
private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext,
final PipelineDataSourceWrapper dataSource) {
if (null == dumperContext.getUniqueKeyColumns()) {
String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String schemaName = dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName()));
String actualTableName = dumperContext.getActualTableName();
List<PipelineColumnMetaData> uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader());
dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
Expand All @@ -140,8 +140,8 @@ private Collection<InventoryDumperContext> splitByPrimaryKey(final InventoryDump
Collection<IngestPosition> inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource);
int i = 0;
for (IngestPosition each : inventoryPositions) {
InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext);
splitDumperContext.setPosition(each);
InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext.getCommonContext());
splitDumperContext.getCommonContext().setPosition(each);
splitDumperContext.setShardingItem(i++);
splitDumperContext.setActualTableName(dumperContext.getActualTableName());
splitDumperContext.setLogicTableName(dumperContext.getLogicTableName());
Expand Down Expand Up @@ -205,7 +205,7 @@ private Range<Long> getUniqueKeyValuesRange(final InventoryIncrementalJobItemCon
String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey);
dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public static IngestPosition getIncrementalPosition(final JobItemIncrementalTask
return position.get();
}
}
DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType();
DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,13 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl

public MySQLIncrementalDumper(final IncrementalDumperContext dumperContext, final IngestPosition binlogPosition,
final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) {
Preconditions.checkArgument(dumperContext.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
Preconditions.checkArgument(
dumperContext.getCommonContext().getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration");
this.dumperContext = dumperContext;
this.binlogPosition = (BinlogPosition) binlogPosition;
this.channel = channel;
this.metaDataLoader = metaDataLoader;
YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()).getJdbcConfig();
YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig();
ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, TypedSPILoader.getService(DatabaseType.class, "MySQL"));
ConnectionProperties connectionProps = parser.parse(jdbcConfig.getUrl(), null, null);
ConnectInfo connectInfo = new ConnectInfo(generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword());
Expand Down Expand Up @@ -134,7 +135,7 @@ private List<? extends Record> handleEvent(final AbstractBinlogEvent event) {
return Collections.singletonList(createPlaceholderRecord(event));
}
AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.containsTable(rowsEvent.getTableName())) {
if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.getCommonContext().containsTable(rowsEvent.getTableName())) {
return Collections.singletonList(createPlaceholderRecord(event));
}
PipelineTableMetaData tableMetaData = getPipelineTableMetaData(rowsEvent.getTableName());
Expand All @@ -157,11 +158,11 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent even
}

private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) {
return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new ActualTableName(actualTableName)), actualTableName);
return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getSchemaName(new ActualTableName(actualTableName)), actualTableName);
}

private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
Collection<ColumnName> columnNames = dumperContext.getCommonContext().getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getAfterRows()) {
DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length);
Expand All @@ -182,7 +183,7 @@ private boolean isColumnUnneeded(final Collection<ColumnName> columnNames, final
}

private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
Collection<ColumnName> columnNames = dumperContext.getCommonContext().getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
Expand All @@ -206,7 +207,7 @@ private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent event, fina
}

private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) {
Collection<ColumnName> columnNames = dumperContext.getColumnNames(event.getTableName());
Collection<ColumnName> columnNames = dumperContext.getCommonContext().getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getBeforeRows()) {
DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length);
Expand Down Expand Up @@ -234,7 +235,7 @@ private Serializable handleValue(final PipelineColumnMetaData columnMetaData, fi
}

private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) {
String tableName = dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal();
String tableName = dumperContext.getCommonContext().getLogicTableName(rowsEvent.getTableName()).getOriginal();
IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId());
DataRecord result = new DataRecord(type, tableName, position, columnCount);
result.setActualTableName(rowsEvent.getTableName());
Expand Down
Loading

0 comments on commit b04cb22

Please sign in to comment.