From b04cb220db1d913fea8e02367cae5e26e9db24e6 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Fri, 3 Nov 2023 23:44:28 +0800 Subject: [PATCH] Rename DumperCommonContext --- .../context/ingest/DumperCommonContext.java | 2 +- .../ingest/IncrementalDumperContext.java | 8 +++++-- .../ingest/InventoryDumperContext.java | 15 ++++++------ .../pipeline/core/dumper/InventoryDumper.java | 20 ++++++++-------- .../InventoryRecordsCountCalculator.java | 2 +- .../core/preparer/InventoryTaskSplitter.java | 16 ++++++------- .../preparer/PipelineJobPreparerUtils.java | 4 ++-- .../mysql/ingest/MySQLIncrementalDumper.java | 17 +++++++------- .../ingest/MySQLIncrementalDumperTest.java | 23 ++++++++++--------- .../opengauss/ingest/OpenGaussWALDumper.java | 4 ++-- .../ingest/PostgreSQLWALDumper.java | 4 ++-- .../ingest/wal/WALEventConverter.java | 8 +++---- .../ingest/PostgreSQLWALDumperTest.java | 15 +++++++----- .../ingest/wal/WALEventConverterTest.java | 17 +++++++------- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 14 ++++++----- .../cdc/context/CDCJobItemContext.java | 4 ++-- .../cdc/core/prepare/CDCJobPreparer.java | 20 ++++++++-------- .../migration/api/impl/MigrationJobAPI.java | 8 ++++--- ...rationIncrementalDumperContextCreator.java | 12 ++++++---- .../context/MigrationJobItemContext.java | 2 +- .../prepare/MigrationJobPreparer.java | 16 +++++++------ .../GovernanceRepositoryAPIImplTest.java | 4 ++-- .../prepare/InventoryTaskSplitterTest.java | 18 +++++++-------- .../core/task/IncrementalTaskTest.java | 2 +- .../pipeline/core/task/InventoryTaskTest.java | 9 ++++---- .../MigrationDataConsistencyCheckerTest.java | 2 +- 26 files changed, 144 insertions(+), 122 deletions(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/DumperCommonContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/DumperCommonContext.java index 31d5f33abbed8d..0b6dc68a51d912 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/DumperCommonContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/DumperCommonContext.java @@ -39,7 +39,7 @@ @Getter @Setter @ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"}) -public abstract class DumperCommonContext { +public final class DumperCommonContext { private String dataSourceName; diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/IncrementalDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/IncrementalDumperContext.java index 6f568ebceaeac3..00e4a9eade4969 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/IncrementalDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/IncrementalDumperContext.java @@ -18,16 +18,20 @@ package org.apache.shardingsphere.data.pipeline.api.context.ingest; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; /** * Incremental dumper context. */ +@RequiredArgsConstructor @Getter @Setter -@ToString(callSuper = true) -public final class IncrementalDumperContext extends DumperCommonContext { +@ToString +public final class IncrementalDumperContext { + + private final DumperCommonContext commonContext; private String jobId; diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java index 5b4b723b4487c4..5ba40767dd0f01 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/ingest/InventoryDumperContext.java @@ -30,8 +30,10 @@ */ @Getter @Setter -@ToString(callSuper = true) -public final class InventoryDumperContext extends DumperCommonContext { +@ToString +public final class InventoryDumperContext { + + private final DumperCommonContext commonContext; private String actualTableName; @@ -51,12 +53,9 @@ public final class InventoryDumperContext extends DumperCommonContext { private JobRateLimitAlgorithm rateLimitAlgorithm; - public InventoryDumperContext(final DumperCommonContext dumperContext) { - setDataSourceName(dumperContext.getDataSourceName()); - setDataSourceConfig(dumperContext.getDataSourceConfig()); - setTableNameMap(dumperContext.getTableNameMap()); - setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping()); - setTargetTableColumnsMap(dumperContext.getTargetTableColumnsMap()); + public InventoryDumperContext(final DumperCommonContext commonContext) { + this.commonContext = commonContext; + this.commonContext.setPosition(null); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java index d1ca7a44e2e35f..3aa8eb8b6e495a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java @@ -89,7 +89,7 @@ public InventoryDumper(final InventoryDumperContext dumperContext, final Pipelin this.dumperContext = dumperContext; this.channel = channel; this.dataSource = dataSource; - DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType(); + DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType(); inventoryDumpSQLBuilder = new PipelineInventoryDumpSQLBuilder(databaseType); columnValueReaderEngine = new ColumnValueReaderEngine(databaseType); this.metaDataLoader = metaDataLoader; @@ -97,12 +97,13 @@ public InventoryDumper(final InventoryDumperContext dumperContext, final Pipelin @Override protected void runBlocking() { - IngestPosition position = dumperContext.getPosition(); + IngestPosition position = dumperContext.getCommonContext().getPosition(); if (position instanceof FinishedPosition) { log.info("Ignored because of already finished."); return; } - PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName()); + PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData( + dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName()); try (Connection connection = dataSource.getConnection()) { dump(tableMetaData, connection); } catch (final SQLException ex) { @@ -114,7 +115,7 @@ protected void runBlocking() { @SuppressWarnings("MagicConstant") private void dump(final PipelineTableMetaData tableMetaData, final Connection connection) throws SQLException { int batchSize = dumperContext.getBatchSize(); - DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType(); + DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType(); if (null != dumperContext.getTransactionIsolation()) { connection.setTransactionIsolation(dumperContext.getTransactionIsolation()); } @@ -157,13 +158,13 @@ private String buildInventoryDumpSQL() { return dumperContext.getQuerySQL(); } LogicTableName logicTableName = new LogicTableName(dumperContext.getLogicTableName()); - String schemaName = dumperContext.getSchemaName(logicTableName); + String schemaName = dumperContext.getCommonContext().getSchemaName(logicTableName); if (!dumperContext.hasUniqueKey()) { return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, dumperContext.getActualTableName()); } - PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) dumperContext.getPosition(); + PrimaryKeyPosition primaryKeyPosition = (PrimaryKeyPosition) dumperContext.getCommonContext().getPosition(); PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0); - Collection columnNames = dumperContext.getColumnNames(logicTableName); + Collection columnNames = dumperContext.getCommonContext().getColumnNames(logicTableName); if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) { if (null != primaryKeyPosition.getBeginValue() && null != primaryKeyPosition.getEndValue()) { return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, dumperContext.getActualTableName(), columnNames, firstColumn.getName()); @@ -180,7 +181,7 @@ private void setParameters(final PreparedStatement preparedStatement) throws SQL return; } PipelineColumnMetaData firstColumn = dumperContext.getUniqueKeyColumns().get(0); - PrimaryKeyPosition position = (PrimaryKeyPosition) dumperContext.getPosition(); + PrimaryKeyPosition position = (PrimaryKeyPosition) dumperContext.getCommonContext().getPosition(); if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) && null != position.getBeginValue() && null != position.getEndValue()) { preparedStatement.setObject(1, position.getBeginValue()); preparedStatement.setObject(2, position.getEndValue()); @@ -213,7 +214,8 @@ private DataRecord loadDataRecord(final ResultSet resultSet, final ResultSetMeta private IngestPosition newPosition(final ResultSet resultSet) throws SQLException { return dumperContext.hasUniqueKey() - ? PrimaryKeyPositionFactory.newInstance(resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition) dumperContext.getPosition()).getEndValue()) + ? PrimaryKeyPositionFactory.newInstance( + resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), ((PrimaryKeyPosition) dumperContext.getCommonContext().getPosition()).getEndValue()) : new PlaceholderPosition(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java index 10d164bc483eb8..12bd4dffe0b7fd 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java @@ -52,7 +52,7 @@ public final class InventoryRecordsCountCalculator { * @throws SplitPipelineJobByUniqueKeyException if there's exception from database */ public static long getTableRecordsCount(final InventoryDumperContext dumperContext, final PipelineDataSourceWrapper dataSource) { - String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())); + String schemaName = dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName())); String actualTableName = dumperContext.getActualTableName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(dataSource.getDatabaseType()); Optional sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java index 87af351be19e2c..037351614c7455 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java @@ -84,7 +84,7 @@ public List splitInventoryData(final InventoryIncrementalJobItemC long startTimeMillis = System.currentTimeMillis(); InventoryIncrementalProcessContext processContext = jobItemContext.getJobProcessContext(); for (InventoryDumperContext each : splitInventoryDumperContext(jobItemContext)) { - AtomicReference position = new AtomicReference<>(each.getPosition()); + AtomicReference position = new AtomicReference<>(each.getCommonContext().getPosition()); PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position); Dumper dumper = new InventoryDumper(each, channel, sourceDataSource, jobItemContext.getSourceMetaDataLoader()); Importer importer = new SingleChannelConsumerImporter(channel, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), jobItemContext); @@ -111,12 +111,12 @@ public Collection splitInventoryDumperContext(final Inve private Collection splitByTable(final InventoryDumperContext dumperContext) { Collection result = new LinkedList<>(); - dumperContext.getTableNameMap().forEach((key, value) -> { - InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext); + dumperContext.getCommonContext().getTableNameMap().forEach((key, value) -> { + InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(dumperContext.getCommonContext()); // use original table name, for metadata loader, since some database table name case-sensitive inventoryDumperContext.setActualTableName(key.getOriginal()); inventoryDumperContext.setLogicTableName(value.getOriginal()); - inventoryDumperContext.setPosition(new PlaceholderPosition()); + inventoryDumperContext.getCommonContext().setPosition(new PlaceholderPosition()); inventoryDumperContext.setInsertColumnNames(dumperContext.getInsertColumnNames()); inventoryDumperContext.setUniqueKeyColumns(dumperContext.getUniqueKeyColumns()); result.add(inventoryDumperContext); @@ -127,7 +127,7 @@ private Collection splitByTable(final InventoryDumperCon private Collection splitByPrimaryKey(final InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext jobItemContext, final PipelineDataSourceWrapper dataSource) { if (null == dumperContext.getUniqueKeyColumns()) { - String schemaName = dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())); + String schemaName = dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName())); String actualTableName = dumperContext.getActualTableName(); List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, jobItemContext.getSourceMetaDataLoader()); dumperContext.setUniqueKeyColumns(uniqueKeyColumns); @@ -140,8 +140,8 @@ private Collection splitByPrimaryKey(final InventoryDump Collection inventoryPositions = getInventoryPositions(dumperContext, jobItemContext, dataSource); int i = 0; for (IngestPosition each : inventoryPositions) { - InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext); - splitDumperContext.setPosition(each); + InventoryDumperContext splitDumperContext = new InventoryDumperContext(dumperContext.getCommonContext()); + splitDumperContext.getCommonContext().setPosition(each); splitDumperContext.setShardingItem(i++); splitDumperContext.setActualTableName(dumperContext.getActualTableName()); splitDumperContext.setLogicTableName(dumperContext.getLogicTableName()); @@ -205,7 +205,7 @@ private Range getUniqueKeyValuesRange(final InventoryIncrementalJobItemCon String uniqueKey = dumperContext.getUniqueKeyColumns().get(0).getName(); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType()); String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL( - dumperContext.getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey); + dumperContext.getCommonContext().getSchemaName(new LogicTableName(dumperContext.getLogicTableName())), dumperContext.getActualTableName(), uniqueKey); try ( Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java index d00ef8e2506225..6e0ac2ee63b7be 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java @@ -131,8 +131,8 @@ public static IngestPosition getIncrementalPosition(final JobItemIncrementalTask return position.get(); } } - DatabaseType databaseType = dumperContext.getDataSourceConfig().getDatabaseType(); - DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); + DatabaseType databaseType = dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType(); + DataSource dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()); return DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType).init(dataSource, dumperContext.getJobId()); } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java index d1fb5ca8ea4897..8c24dff7eaf7b7 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java @@ -82,12 +82,13 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl public MySQLIncrementalDumper(final IncrementalDumperContext dumperContext, final IngestPosition binlogPosition, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - Preconditions.checkArgument(dumperContext.getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration"); + Preconditions.checkArgument( + dumperContext.getCommonContext().getDataSourceConfig() instanceof StandardPipelineDataSourceConfiguration, "MySQLBinlogDumper only support StandardPipelineDataSourceConfiguration"); this.dumperContext = dumperContext; this.binlogPosition = (BinlogPosition) binlogPosition; this.channel = channel; this.metaDataLoader = metaDataLoader; - YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()).getJdbcConfig(); + YamlJdbcConfiguration jdbcConfig = ((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).getJdbcConfig(); ConnectionPropertiesParser parser = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, TypedSPILoader.getService(DatabaseType.class, "MySQL")); ConnectionProperties connectionProps = parser.parse(jdbcConfig.getUrl(), null, null); ConnectInfo connectInfo = new ConnectInfo(generateServerId(), connectionProps.getHostname(), connectionProps.getPort(), jdbcConfig.getUsername(), jdbcConfig.getPassword()); @@ -134,7 +135,7 @@ private List handleEvent(final AbstractBinlogEvent event) { return Collections.singletonList(createPlaceholderRecord(event)); } AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event; - if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.containsTable(rowsEvent.getTableName())) { + if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.getCommonContext().containsTable(rowsEvent.getTableName())) { return Collections.singletonList(createPlaceholderRecord(event)); } PipelineTableMetaData tableMetaData = getPipelineTableMetaData(rowsEvent.getTableName()); @@ -157,11 +158,11 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent even } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new ActualTableName(actualTableName)), actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getSchemaName(new ActualTableName(actualTableName)), actualTableName); } private List handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) { - Collection columnNames = dumperContext.getColumnNames(event.getTableName()); + Collection columnNames = dumperContext.getCommonContext().getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (Serializable[] each : event.getAfterRows()) { DataRecord dataRecord = createDataRecord(IngestDataChangeType.INSERT, event, each.length); @@ -182,7 +183,7 @@ private boolean isColumnUnneeded(final Collection columnNames, final } private List handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) { - Collection columnNames = dumperContext.getColumnNames(event.getTableName()); + Collection columnNames = dumperContext.getCommonContext().getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (int i = 0; i < event.getBeforeRows().size(); i++) { Serializable[] beforeValues = event.getBeforeRows().get(i); @@ -206,7 +207,7 @@ private List handleUpdateRowsEvent(final UpdateRowsEvent event, fina } private List handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) { - Collection columnNames = dumperContext.getColumnNames(event.getTableName()); + Collection columnNames = dumperContext.getCommonContext().getColumnNames(event.getTableName()); List result = new LinkedList<>(); for (Serializable[] each : event.getBeforeRows()) { DataRecord dataRecord = createDataRecord(IngestDataChangeType.DELETE, event, each.length); @@ -234,7 +235,7 @@ private Serializable handleValue(final PipelineColumnMetaData columnMetaData, fi } private DataRecord createDataRecord(final String type, final AbstractRowsEvent rowsEvent, final int columnCount) { - String tableName = dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getCommonContext().getLogicTableName(rowsEvent.getTableName()).getOriginal(); IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition(), rowsEvent.getServerId()); DataRecord result = new DataRecord(type, tableName, position, columnCount); result.setActualTableName(rowsEvent.getTableName()); diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index a2df83744e1962..fad27b7b2f510c 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -19,6 +19,7 @@ import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; @@ -85,7 +86,7 @@ class MySQLIncrementalDumperTest { void setUp() { dumperContext = mockDumperContext(); initTableData(dumperContext); - dumperContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root")); + dumperContext.getCommonContext().setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root")); PipelineTableMetaDataLoader metaDataLoader = mock(PipelineTableMetaDataLoader.class); SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10000, new EmptyAckCallback()); incrementalDumper = new MySQLIncrementalDumper(dumperContext, new BinlogPosition("binlog-000001", 4L, 0L), channel, metaDataLoader); @@ -94,19 +95,19 @@ void setUp() { } private IncrementalDumperContext mockDumperContext() { - IncrementalDumperContext result = new IncrementalDumperContext(); - result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); - result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); - result.setTargetTableColumnsMap(Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id")))); - return result; + DumperCommonContext commonContext = new DumperCommonContext(); + commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root")); + commonContext.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); + commonContext.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); + commonContext.setTargetTableColumnsMap(Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id")))); + return new IncrementalDumperContext(commonContext); } @SneakyThrows(SQLException.class) private void initTableData(final IncrementalDumperContext dumperContext) { try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); - PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); + PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); @@ -138,7 +139,7 @@ void assertWriteRowsEventWithCustomColumns() throws ReflectiveOperationException } private void assertWriteRowsEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); + dumperContext.getCommonContext().setTargetTableColumnsMap(targetTableColumnsMap); WriteRowsEvent rowsEvent = new WriteRowsEvent(); rowsEvent.setDatabaseName(""); rowsEvent.setTableName("t_order"); @@ -166,7 +167,7 @@ void assertUpdateRowsEventWithCustomColumns() throws ReflectiveOperationExceptio } private void assertUpdateRowsEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); + dumperContext.getCommonContext().setTargetTableColumnsMap(targetTableColumnsMap); UpdateRowsEvent rowsEvent = new UpdateRowsEvent(); rowsEvent.setDatabaseName("test"); rowsEvent.setTableName("t_order"); @@ -191,7 +192,7 @@ void assertDeleteRowsEventWithCustomColumns() throws ReflectiveOperationExceptio } private void assertDeleteRowsEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); + dumperContext.getCommonContext().setTargetTableColumnsMap(targetTableColumnsMap); DeleteRowsEvent rowsEvent = new DeleteRowsEvent(); rowsEvent.setDatabaseName(""); rowsEvent.setTableName("t_order"); diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java index 15626615e503d8..852dc512c952e4 100644 --- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java +++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java @@ -74,7 +74,7 @@ public final class OpenGaussWALDumper extends AbstractLifecycleExecutor implemen public OpenGaussWALDumper(final IncrementalDumperContext dumperContext, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getDataSourceConfig().getClass()), + ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration")); this.dumperContext = dumperContext; walPosition = new AtomicReference<>((WALPosition) position); @@ -137,7 +137,7 @@ private void dump() throws SQLException { } private PgConnection getReplicationConnectionUnwrap() throws SQLException { - return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()).unwrap(PgConnection.class); + return logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()).unwrap(PgConnection.class); } private void processEventWithTX(final AbstractWALEvent event) { diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java index f6901c32906a38..ce7ecfa4aa4ffa 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java @@ -76,7 +76,7 @@ public final class PostgreSQLWALDumper extends AbstractLifecycleExecutor impleme public PostgreSQLWALDumper(final IncrementalDumperContext dumperContext, final IngestPosition position, final PipelineChannel channel, final PipelineTableMetaDataLoader metaDataLoader) { - ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getDataSourceConfig().getClass()), + ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperContext.getCommonContext().getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("PostgreSQLWALDumper only support PipelineDataSourceConfiguration")); this.dumperContext = dumperContext; walPosition = new AtomicReference<>((WALPosition) position); @@ -111,7 +111,7 @@ protected void runBlocking() { private void dump() throws SQLException { // TODO use unified PgConnection try ( - Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig()); + Connection connection = logicalReplication.createConnection((StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig()); PGReplicationStream stream = logicalReplication.createReplicationStream(connection, PostgreSQLPositionInitializer.getUniqueSlotName(connection, dumperContext.getJobId()), walPosition.get().getLogSequenceNumber())) { PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(connection.unwrap(PgConnection.class).getTimestampUtils()); diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java index aa96452a9b6b8a..04e84d36e81f25 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java +++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java @@ -82,7 +82,7 @@ public Record convert(final AbstractWALEvent event) { private boolean filter(final AbstractWALEvent event) { if (event instanceof AbstractRowEvent) { AbstractRowEvent rowEvent = (AbstractRowEvent) event; - return !dumperContext.containsTable(rowEvent.getTableName()); + return !dumperContext.getCommonContext().containsTable(rowEvent.getTableName()); } return false; } @@ -92,7 +92,7 @@ private PlaceholderRecord createPlaceholderRecord(final AbstractWALEvent event) } private PipelineTableMetaData getPipelineTableMetaData(final String actualTableName) { - return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new ActualTableName(actualTableName)), actualTableName); + return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getSchemaName(new ActualTableName(actualTableName)), actualTableName); } private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, final PipelineTableMetaData tableMetaData) { @@ -120,7 +120,7 @@ private DataRecord handleDeleteRowEvent(final DeleteRowEvent event, final Pipeli } private DataRecord createDataRecord(final String type, final AbstractRowEvent rowsEvent, final int columnCount) { - String tableName = dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal(); + String tableName = dumperContext.getCommonContext().getLogicTableName(rowsEvent.getTableName()).getOriginal(); DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount); result.setActualTableName(rowsEvent.getTableName()); result.setCsn(rowsEvent.getCsn()); @@ -128,7 +128,7 @@ private DataRecord createDataRecord(final String type, final AbstractRowEvent ro } private void putColumnsIntoDataRecord(final DataRecord dataRecord, final PipelineTableMetaData tableMetaData, final String actualTableName, final List values) { - Collection columnNames = dumperContext.getColumnNames(actualTableName); + Collection columnNames = dumperContext.getCommonContext().getColumnNames(actualTableName); for (int i = 0, count = values.size(); i < count; i++) { PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(i + 1); if (isColumnUnneeded(columnNames, columnMetaData.getName())) { diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index 62052be5ea927f..8bec26922306d9 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest; import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; @@ -89,7 +90,8 @@ void setUp() { String password = "root"; createTable(jdbcUrl, username, password); dumperContext = createDumperContext(jdbcUrl, username, password); - walDumper = new PostgreSQLWALDumper(dumperContext, position, channel, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()))); + walDumper = new PostgreSQLWALDumper(dumperContext, position, channel, + new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))); } private void createTable(final String jdbcUrl, final String username, final String password) { @@ -104,11 +106,12 @@ private void createTable(final String jdbcUrl, final String username, final Stri } private IncrementalDumperContext createDumperContext(final String jdbcUrl, final String username, final String password) { - IncrementalDumperContext result = new IncrementalDumperContext(); + DumperCommonContext commonContext = new DumperCommonContext(); + commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); + commonContext.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))); + commonContext.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); + IncrementalDumperContext result = new IncrementalDumperContext(commonContext); result.setJobId("0101123456"); - result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))); - result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); return result; } @@ -119,7 +122,7 @@ void tearDown() { @Test void assertStart() throws SQLException, ReflectiveOperationException { - StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getDataSourceConfig(); + StandardPipelineDataSourceConfiguration dataSourceConfig = (StandardPipelineDataSourceConfiguration) dumperContext.getCommonContext().getDataSourceConfig(); try { Plugins.getMemberAccessor().set(PostgreSQLWALDumper.class.getDeclaredField("logicalReplication"), walDumper, logicalReplication); when(logicalReplication.createConnection(dataSourceConfig)).thenReturn(pgConnection); diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 389b3f724bdcae..6a830c9e26a825 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -19,6 +19,7 @@ import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord; @@ -83,24 +84,24 @@ class WALEventConverterTest { void setUp() { dumperContext = mockDumperContext(); PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); - walEventConverter = new WALEventConverter(dumperContext, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()))); + walEventConverter = new WALEventConverter(dumperContext, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()))); initTableData(dumperContext); pipelineTableMetaData = new PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), Collections.emptyList()); } private IncrementalDumperContext mockDumperContext() { - IncrementalDumperContext result = new IncrementalDumperContext(); - result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); - result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); - result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); - return result; + DumperCommonContext commonContext = new DumperCommonContext(); + commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); + commonContext.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))); + commonContext.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap())); + return new IncrementalDumperContext(commonContext); } @SneakyThrows(SQLException.class) private void initTableData(final IncrementalDumperContext dumperContext) { try ( PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); - PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); + PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); @@ -132,7 +133,7 @@ void assertWriteRowEventWithCustomColumns() throws ReflectiveOperationException } private void assertWriteRowEvent0(final Map> targetTableColumnsMap, final int expectedColumnCount) throws ReflectiveOperationException { - dumperContext.setTargetTableColumnsMap(targetTableColumnsMap); + dumperContext.getCommonContext().setTargetTableColumnsMap(targetTableColumnsMap); WriteRowEvent rowsEvent = new WriteRowEvent(); rowsEvent.setSchemaName(""); rowsEvent.setTableName("t_order"); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index b7771bc7d8da3e..38d8a8635fe351 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; @@ -192,7 +193,7 @@ private static InventoryIncrementalJobItemProgress getInventoryIncrementalJobIte final IncrementalDumperContext incrementalDumperContext) throws SQLException { InventoryIncrementalJobItemProgress result = new InventoryIncrementalJobItemProgress(); result.setSourceDatabaseType(jobConfig.getSourceDatabaseType()); - result.setDataSourceName(incrementalDumperContext.getDataSourceName()); + result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName()); IncrementalTaskProgress incrementalTaskProgress = new IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, incrementalDumperContext, dataSourceManager)); result.setIncremental(new JobItemIncrementalTasksProgress(incrementalTaskProgress)); return result; @@ -292,12 +293,13 @@ private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jo dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName(); StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName); - IncrementalDumperContext result = new IncrementalDumperContext(); + DumperCommonContext commonContext = new DumperCommonContext(); + commonContext.setDataSourceName(dataSourceName); + commonContext.setDataSourceConfig(actualDataSourceConfig); + commonContext.setTableNameMap(tableNameMap); + commonContext.setTableNameSchemaNameMapping(tableNameSchemaNameMapping); + IncrementalDumperContext result = new IncrementalDumperContext(commonContext); result.setJobId(jobConfig.getJobId()); - result.setDataSourceName(dataSourceName); - result.setDataSourceConfig(actualDataSourceConfig); - result.setTableNameMap(tableNameMap); - result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping); result.setDecodeWithTX(jobConfig.isDecodeWithTX()); return result; } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java index 23bc51e1f51e9e..47e6acba40cbb5 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java @@ -80,7 +80,7 @@ public final class CDCJobItemContext implements InventoryIncrementalJobItemConte @Override protected PipelineDataSourceWrapper initialize() { - return dataSourceManager.getDataSource(taskConfig.getDumperContext().getDataSourceConfig()); + return dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig()); } }; @@ -99,7 +99,7 @@ public String getJobId() { @Override public String getDataSourceName() { - return taskConfig.getDumperContext().getDataSourceName(); + return taskConfig.getDumperContext().getCommonContext().getDataSourceName(); } @Override diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index ff30ed40a93f29..b36b9f86553719 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -104,7 +104,8 @@ private void initIncrementalPosition(final CDCJobItemContext jobItemContext) { CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { - taskConfig.getDumperContext().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); + taskConfig.getDumperContext().getCommonContext().setPosition( + PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex); } @@ -115,19 +116,19 @@ private void initInventoryTasks(final CDCJobItemContext jobItemContext, final At CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); CDCProcessContext processContext = jobItemContext.getJobProcessContext(); - for (InventoryDumperContext each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext()), importerConfig) + for (InventoryDumperContext each : new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()), importerConfig) .splitInventoryDumperContext(jobItemContext)) { - AtomicReference position = new AtomicReference<>(each.getPosition()); + AtomicReference position = new AtomicReference<>(each.getCommonContext().getPosition()); PipelineChannel channel = PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(), importerConfig.getBatchSize(), position); channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext)); Dumper dumper = new InventoryDumper(each, channel, jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader()); Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(), - needSorting(ImporterType.INVENTORY, hasGlobalCSN(taskConfig.getDumperContext().getDataSourceConfig().getDatabaseType())), + needSorting(ImporterType.INVENTORY, hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())), importerConfig.getRateLimitAlgorithm()); jobItemContext.getInventoryTasks().add(new CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), processContext.getInventoryDumperExecuteEngine(), processContext.getInventoryImporterExecuteEngine(), dumper, importer, position)); - if (!(each.getPosition() instanceof FinishedPosition)) { + if (!(each.getCommonContext().getPosition() instanceof FinishedPosition)) { importerUsed.set(true); } } @@ -146,16 +147,17 @@ private void initIncrementalTask(final CDCJobItemContext jobItemContext, final A CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); IncrementalDumperContext dumperContext = taskConfig.getDumperContext(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); - IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getPosition(), jobItemContext.getInitProgress()); + IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress()); PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), jobItemContext.getJobProcessContext().getPipelineChannelCreator(), taskProgress); channelProgressPairs.add(new CDCChannelProgressPair(channel, jobItemContext)); - Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperContext.getDataSourceConfig().getDatabaseType()) - .createIncrementalDumper(dumperContext, dumperContext.getPosition(), channel, jobItemContext.getSourceMetaDataLoader()); + Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()) + .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, jobItemContext.getSourceMetaDataLoader()); boolean needSorting = needSorting(ImporterType.INCREMENTAL, hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType())); Importer importer = importerUsed.get() ? null : new CDCImporter(channelProgressPairs, importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS, jobItemContext.getSink(), needSorting, importerConfig.getRateLimitAlgorithm()); - PipelineTask incrementalTask = new CDCIncrementalTask(dumperContext.getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress); + PipelineTask incrementalTask = new CDCIncrementalTask( + dumperContext.getCommonContext().getDataSourceName(), jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(), dumper, importer, taskProgress); jobItemContext.getIncrementalTasks().add(incrementalTask); importerUsed.set(true); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index aa1a8e4b141c7b..5bab5d2559f744 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -264,12 +264,14 @@ public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfig MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); - CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperContext.getTableNameSchemaNameMapping()); + CreateTableConfiguration createTableConfig = buildCreateTableConfiguration(jobConfig, incrementalDumperContext.getCommonContext().getTableNameSchemaNameMapping()); Set targetTableNames = jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet()); Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); - ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getTableNameSchemaNameMapping()); - MigrationTaskConfiguration result = new MigrationTaskConfiguration(incrementalDumperContext.getDataSourceName(), createTableConfig, incrementalDumperContext, importerConfig); + ImporterConfiguration importerConfig = buildImporterConfiguration( + jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableNameSchemaNameMapping()); + MigrationTaskConfiguration result = new MigrationTaskConfiguration( + incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfig, incrementalDumperContext, importerConfig); log.info("buildTaskConfiguration, result={}", result); return result; } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index bf4acb9166ccf4..0c713e550af1f4 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -19,6 +19,7 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping; +import org.apache.shardingsphere.data.pipeline.api.context.ingest.DumperCommonContext; import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; @@ -48,12 +49,13 @@ public IncrementalDumperContext createDumperContext(final JobDataNodeLine jobDat private IncrementalDumperContext buildDumperContext(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource, final Map tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) { - IncrementalDumperContext result = new IncrementalDumperContext(); + DumperCommonContext dumperCommonContext = new DumperCommonContext(); + dumperCommonContext.setDataSourceName(dataSourceName); + dumperCommonContext.setDataSourceConfig(sourceDataSource); + dumperCommonContext.setTableNameMap(tableNameMap); + dumperCommonContext.setTableNameSchemaNameMapping(tableNameSchemaNameMapping); + IncrementalDumperContext result = new IncrementalDumperContext(dumperCommonContext); result.setJobId(jobId); - result.setDataSourceName(dataSourceName); - result.setDataSourceConfig(sourceDataSource); - result.setTableNameMap(tableNameMap); - result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping); return result; } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java index 1fe9a79b3d9f0f..a4eaa8330c792a 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java @@ -80,7 +80,7 @@ public final class MigrationJobItemContext implements InventoryIncrementalJobIte @Override protected PipelineDataSourceWrapper initialize() { - return dataSourceManager.getDataSource(taskConfig.getDumperContext().getDataSourceConfig()); + return dataSourceManager.getDataSource(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig()); } }; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index 694424af48585a..50e7363c7ed8c8 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -88,7 +88,8 @@ public final class MigrationJobPreparer { * @throws SQLException SQL exception */ public void prepare(final MigrationJobItemContext jobItemContext) throws SQLException { - ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(jobItemContext.getTaskConfig().getDumperContext().getDataSourceConfig().getClass()), + ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class + .equals(jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()), () -> new UnsupportedSQLOperationException("Migration inventory dumper only support StandardPipelineDataSourceConfiguration")); PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(), Collections.singleton(jobItemContext.getSourceDataSource())); if (jobItemContext.isStopping()) { @@ -169,14 +170,15 @@ private void prepareIncremental(final MigrationJobItemContext jobItemContext) { MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig(); JobItemIncrementalTasksProgress initIncremental = null == jobItemContext.getInitProgress() ? null : jobItemContext.getInitProgress().getIncremental(); try { - taskConfig.getDumperContext().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); + taskConfig.getDumperContext().getCommonContext().setPosition( + PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, taskConfig.getDumperContext(), jobItemContext.getDataSourceManager())); } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex); } } private void initInventoryTasks(final MigrationJobItemContext jobItemContext) { - InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext()); + InventoryDumperContext inventoryDumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext()); InventoryTaskSplitter inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), inventoryDumperContext, jobItemContext.getTaskConfig().getImporterConfig()); jobItemContext.getInventoryTasks().addAll(inventoryTaskSplitter.splitInventoryData(jobItemContext)); } @@ -188,12 +190,12 @@ private void initIncrementalTasks(final MigrationJobItemContext jobItemContext) IncrementalDumperContext dumperContext = taskConfig.getDumperContext(); ImporterConfiguration importerConfig = taskConfig.getImporterConfig(); ExecuteEngine incrementalExecuteEngine = jobItemContext.getJobProcessContext().getIncrementalExecuteEngine(); - IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getPosition(), jobItemContext.getInitProgress()); + IncrementalTaskProgress taskProgress = PipelineTaskUtils.createIncrementalTaskProgress(dumperContext.getCommonContext().getPosition(), jobItemContext.getInitProgress()); PipelineChannel channel = PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), pipelineChannelCreator, taskProgress); - Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperContext.getDataSourceConfig().getDatabaseType()) - .createIncrementalDumper(dumperContext, dumperContext.getPosition(), channel, sourceMetaDataLoader); + Dumper dumper = DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()) + .createIncrementalDumper(dumperContext, dumperContext.getCommonContext().getPosition(), channel, sourceMetaDataLoader); Collection importers = createImporters(importerConfig, jobItemContext.getSink(), channel, jobItemContext); - PipelineTask incrementalTask = new IncrementalTask(dumperContext.getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress); + PipelineTask incrementalTask = new IncrementalTask(dumperContext.getCommonContext().getDataSourceName(), incrementalExecuteEngine, dumper, importers, taskProgress); jobItemContext.getIncrementalTasks().add(incrementalTask); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java index a592fe3249c222..be5cb9b7df7904 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/job/service/GovernanceRepositoryAPIImplTest.java @@ -177,8 +177,8 @@ private MigrationJobItemContext mockJobItemContext() { } private InventoryTask mockInventoryTask(final MigrationTaskConfiguration taskConfig) { - InventoryDumperContext dumperContext = new InventoryDumperContext(taskConfig.getDumperContext()); - dumperContext.setPosition(new PlaceholderPosition()); + InventoryDumperContext dumperContext = new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()); + dumperContext.getCommonContext().setPosition(new PlaceholderPosition()); dumperContext.setActualTableName("t_order"); dumperContext.setLogicTableName("t_order"); dumperContext.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData())); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java index aaa5e01acdd7c2..eee62dda236406 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java @@ -66,7 +66,7 @@ static void beforeClass() { @BeforeEach void setUp() { initJobItemContext(); - dumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext()); + dumperContext = new InventoryDumperContext(jobItemContext.getTaskConfig().getDumperContext().getCommonContext()); PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(1, "order_id", Types.INTEGER, "int", false, true, true); dumperContext.setUniqueKeyColumns(Collections.singletonList(columnMetaData)); inventoryTaskSplitter = new InventoryTaskSplitter(jobItemContext.getSourceDataSource(), dumperContext, jobItemContext.getTaskConfig().getImporterConfig()); @@ -85,7 +85,7 @@ void tearDown() { @Test void assertSplitInventoryDataWithEmptyTable() throws SQLException { - initEmptyTablePrimaryEnvironment(dumperContext); + initEmptyTablePrimaryEnvironment(dumperContext.getCommonContext()); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(1)); InventoryTask task = actual.get(0); @@ -95,7 +95,7 @@ void assertSplitInventoryDataWithEmptyTable() throws SQLException { @Test void assertSplitInventoryDataWithIntPrimary() throws SQLException { - initIntPrimaryEnvironment(dumperContext); + initIntPrimaryEnvironment(dumperContext.getCommonContext()); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(10)); InventoryTask task = actual.get(9); @@ -105,7 +105,7 @@ void assertSplitInventoryDataWithIntPrimary() throws SQLException { @Test void assertSplitInventoryDataWithCharPrimary() throws SQLException { - initCharPrimaryEnvironment(dumperContext); + initCharPrimaryEnvironment(dumperContext.getCommonContext()); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(1)); assertThat(actual.get(0).getTaskId(), is("ds_0.t_order#0")); @@ -116,15 +116,15 @@ void assertSplitInventoryDataWithCharPrimary() throws SQLException { @Test void assertSplitInventoryDataWithoutPrimaryButWithUniqueIndex() throws SQLException { - initUniqueIndexOnNotNullColumnEnvironment(dumperContext); + initUniqueIndexOnNotNullColumnEnvironment(dumperContext.getCommonContext()); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); assertThat(actual.size(), is(1)); } @Test void assertSplitInventoryDataWithMultipleColumnsKey() throws SQLException { - initUnionPrimaryEnvironment(dumperContext); - try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())) { + initUnionPrimaryEnvironment(dumperContext.getCommonContext()); + try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())) { List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new StandardPipelineTableMetaDataLoader(dataSource)); dumperContext.setUniqueKeyColumns(uniqueKeyColumns); List actual = inventoryTaskSplitter.splitInventoryData(jobItemContext); @@ -134,8 +134,8 @@ void assertSplitInventoryDataWithMultipleColumnsKey() throws SQLException { @Test void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws SQLException { - initNoPrimaryEnvironment(dumperContext); - try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())) { + initNoPrimaryEnvironment(dumperContext.getCommonContext()); + try (PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())) { List uniqueKeyColumns = PipelineTableMetaDataUtils.getUniqueKeyColumns(null, "t_order", new StandardPipelineTableMetaDataLoader(dataSource)); assertTrue(uniqueKeyColumns.isEmpty()); List inventoryTasks = inventoryTaskSplitter.splitInventoryData(jobItemContext); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java index 0e70960365af5a..eb42b7d5b8e443 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/IncrementalTaskTest.java @@ -54,7 +54,7 @@ static void beforeClass() { @BeforeEach void setUp() { MigrationTaskConfiguration taskConfig = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig(); - taskConfig.getDumperContext().setPosition(new PlaceholderPosition()); + taskConfig.getDumperContext().getCommonContext().setPosition(new PlaceholderPosition()); incrementalTask = new IncrementalTask("ds_0", PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), Collections.singletonList(mock(Importer.class)), new IncrementalTaskProgress(new PlaceholderPosition())); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java index 8dcd785c33af4b..9900dc395e76e5 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java @@ -76,7 +76,7 @@ void assertGetProgress() throws SQLException, ExecutionException, InterruptedExc initTableData(taskConfig.getDumperContext()); // TODO use t_order_0, and also others InventoryDumperContext inventoryDumperContext = createInventoryDumperContext("t_order", "t_order"); - AtomicReference position = new AtomicReference<>(inventoryDumperContext.getPosition()); + AtomicReference position = new AtomicReference<>(inventoryDumperContext.getCommonContext().getPosition()); InventoryTask inventoryTask = new InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperContext), PipelineContextUtils.getExecuteEngine(), PipelineContextUtils.getExecuteEngine(), mock(Dumper.class), mock(Importer.class), position); CompletableFuture.allOf(inventoryTask.start().toArray(new CompletableFuture[0])).get(10L, TimeUnit.SECONDS); @@ -87,7 +87,7 @@ void assertGetProgress() throws SQLException, ExecutionException, InterruptedExc private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); try ( - PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig()); + PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()); Connection connection = dataSource.getConnection(); Statement statement = connection.createStatement()) { statement.execute("DROP TABLE IF EXISTS t_order"); @@ -98,11 +98,12 @@ private void initTableData(final IncrementalDumperContext dumperContext) throws } private InventoryDumperContext createInventoryDumperContext(final String logicTableName, final String actualTableName) { - InventoryDumperContext result = new InventoryDumperContext(taskConfig.getDumperContext()); + InventoryDumperContext result = new InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()); result.setLogicTableName(logicTableName); result.setActualTableName(actualTableName); result.setUniqueKeyColumns(Collections.singletonList(PipelineContextUtils.mockOrderIdColumnMetaData())); - result.setPosition(null == taskConfig.getDumperContext().getPosition() ? new IntegerPrimaryKeyPosition(0, 1000) : taskConfig.getDumperContext().getPosition()); + result.getCommonContext().setPosition( + null == taskConfig.getDumperContext().getCommonContext().getPosition() ? new IntegerPrimaryKeyPosition(0, 1000) : taskConfig.getDumperContext().getCommonContext().getPosition()); return result; } } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java index c500b0e370e22a..23f8210a468039 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java @@ -74,7 +74,7 @@ private ConsistencyCheckJobItemProgressContext createConsistencyCheckJobItemProg private MigrationJobConfiguration createJobConfiguration() throws SQLException { MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()); - initTableData(jobItemContext.getTaskConfig().getDumperContext().getDataSourceConfig()); + initTableData(jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig()); initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig()); return jobItemContext.getJobConfig(); }