From 3f85b14f63b5f229c252e3a2bf24b1d00e566410 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sun, 5 Nov 2023 22:33:57 +0800 Subject: [PATCH] Refactor DumperCommonContext (#28948) * Refactor DumperCommonContext * Refactor IncrementalDumperContext --- .../dumper/context/DumperCommonContext.java | 10 ++++--- .../context/IncrementalDumperContext.java | 6 ++-- .../context/InventoryDumperContext.java | 7 ++--- .../ingest/MySQLIncrementalDumperTest.java | 10 +++---- .../ingest/PostgreSQLWALDumperTest.java | 12 ++++---- .../ingest/wal/WALEventConverterTest.java | 10 +++---- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 16 ++++------- ...rationIncrementalDumperContextCreator.java | 28 ++++--------------- 8 files changed, 36 insertions(+), 63 deletions(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java index cdcbf6b0f3f89..fc97bb8c81318 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; @@ -28,18 +29,19 @@ /** * Dumper common context. */ +@RequiredArgsConstructor @Getter @Setter @ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"}) public final class DumperCommonContext { - private String dataSourceName; + private final String dataSourceName; - private PipelineDataSourceConfiguration dataSourceConfig; + private final PipelineDataSourceConfiguration dataSourceConfig; - private ActualAndLogicTableNameMapper tableNameMapper; + private final ActualAndLogicTableNameMapper tableNameMapper; - private TableAndSchemaNameMapper tableAndSchemaNameMapper; + private final TableAndSchemaNameMapper tableAndSchemaNameMapper; private IngestPosition position; } diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java index 4b7697c7dfaf6..4bcb2ce0d4613 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/IncrementalDumperContext.java @@ -19,7 +19,6 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; -import lombok.Setter; import lombok.ToString; /** @@ -27,13 +26,12 @@ */ @RequiredArgsConstructor @Getter -@Setter @ToString public final class IncrementalDumperContext { private final DumperCommonContext commonContext; - private String jobId; + private final String jobId; - private boolean decodeWithTX; + private final boolean decodeWithTX; } diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java index 2e1d44438d7e8..b39cffbb8ac01 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java @@ -54,11 +54,8 @@ public final class InventoryDumperContext { private JobRateLimitAlgorithm rateLimitAlgorithm; public InventoryDumperContext(final DumperCommonContext commonContext) { - this.commonContext = new DumperCommonContext(); - this.commonContext.setDataSourceName(commonContext.getDataSourceName()); - this.commonContext.setDataSourceConfig(commonContext.getDataSourceConfig()); - this.commonContext.setTableNameMapper(commonContext.getTableNameMapper()); - this.commonContext.setTableAndSchemaNameMapper(commonContext.getTableAndSchemaNameMapper()); + this.commonContext = new DumperCommonContext( + commonContext.getDataSourceName(), commonContext.getDataSourceConfig(), commonContext.getTableNameMapper(), commonContext.getTableAndSchemaNameMapper()); } /** diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index c83e3ede6d324..bd8ee875b1327 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -97,11 +97,11 @@ void setUp() throws SQLException { } private IncrementalDumperContext createDumperContext() { - DumperCommonContext commonContext = new DumperCommonContext(); - commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root")); - commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); - commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); - return new IncrementalDumperContext(commonContext); + DumperCommonContext commonContext = new DumperCommonContext(null, + new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root"), + new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))), + new TableAndSchemaNameMapper(Collections.emptyMap())); + return new IncrementalDumperContext(commonContext, null, false); } private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index 4a033b76e6304..a6a90a7981774 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -107,13 +107,11 @@ private void createTable(final String jdbcUrl, final String username, final Stri } private IncrementalDumperContext createDumperContext(final String jdbcUrl, final String username, final String password) { - DumperCommonContext commonContext = new DumperCommonContext(); - commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); - commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")))); - commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); - IncrementalDumperContext result = new IncrementalDumperContext(commonContext); - result.setJobId("0101123456"); - return result; + DumperCommonContext commonContext = new DumperCommonContext(null, + new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password), + new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))), + new TableAndSchemaNameMapper(Collections.emptyMap())); + return new IncrementalDumperContext(commonContext, "0101123456", false); } @AfterEach diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 29c625de0a7d4..9306159d72b5e 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -86,11 +86,11 @@ void setUp() throws SQLException { } private IncrementalDumperContext mockDumperContext() { - DumperCommonContext commonContext = new DumperCommonContext(); - commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); - commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); - commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); - return new IncrementalDumperContext(commonContext); + DumperCommonContext commonContext = new DumperCommonContext(null, + new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"), + new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))), + new TableAndSchemaNameMapper(Collections.emptyMap())); + return new IncrementalDumperContext(commonContext, null, false); } private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index d25feef3a3b88..3b96545fde320 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -279,19 +279,13 @@ public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguratio private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem); - Map tableNameMap = new LinkedHashMap<>(); - dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName(); StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName); - DumperCommonContext commonContext = new DumperCommonContext(); - commonContext.setDataSourceName(dataSourceName); - commonContext.setDataSourceConfig(actualDataSourceConfig); - commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); - commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); - IncrementalDumperContext result = new IncrementalDumperContext(commonContext); - result.setJobId(jobConfig.getJobId()); - result.setDecodeWithTX(jobConfig.isDecodeWithTX()); - return result; + Map tableNameMap = new LinkedHashMap<>(); + dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); + return new IncrementalDumperContext( + new DumperCommonContext(dataSourceName, actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), tableAndSchemaNameMapper), + jobConfig.getJobId(), jobConfig.isDecodeWithTX()); } private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection schemaTableNames, diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index 764b22dec926d..a5ee57453347c 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -19,21 +19,16 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; -import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.config.ingest.IncrementalDumperContextCreator; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; -import java.util.Map; - /** - * Migration incremental dumper configuration creator. + * Migration incremental dumper context creator. */ @RequiredArgsConstructor public final class MigrationIncrementalDumperContextCreator implements IncrementalDumperContextCreator { @@ -42,21 +37,10 @@ public final class MigrationIncrementalDumperContextCreator implements Increment @Override public IncrementalDumperContext createDumperContext(final JobDataNodeLine jobDataNodeLine) { - Map tableNameMap = JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine); - TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); String dataSourceName = jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName(); - return buildDumperContext(jobConfig.getJobId(), dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMap, tableAndSchemaNameMapper); - } - - private IncrementalDumperContext buildDumperContext(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource, - final Map tableNameMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - DumperCommonContext commonContext = new DumperCommonContext(); - commonContext.setDataSourceName(dataSourceName); - commonContext.setDataSourceConfig(sourceDataSource); - commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); - commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); - IncrementalDumperContext result = new IncrementalDumperContext(commonContext); - result.setJobId(jobId); - return result; + ActualAndLogicTableNameMapper tableNameMapper = new ActualAndLogicTableNameMapper(JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine)); + TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); + return new IncrementalDumperContext( + new DumperCommonContext(dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMapper, tableAndSchemaNameMapper), jobConfig.getJobId(), false); } }