From e47992398a4bab30a461b48052869fa67060c081 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 5 Nov 2023 21:40:21 +0800 Subject: [PATCH] Refactor DumperCommonContext --- .../dumper/context/DumperCommonContext.java | 10 ++++--- .../context/InventoryDumperContext.java | 7 ++--- .../ingest/MySQLIncrementalDumperTest.java | 8 +++--- .../ingest/PostgreSQLWALDumperTest.java | 8 +++--- .../ingest/wal/WALEventConverterTest.java | 8 +++--- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 12 +++------ ...rationIncrementalDumperContextCreator.java | 27 +++++-------------- 7 files changed, 30 insertions(+), 50 deletions(-) diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java index cdcbf6b0f3f89..fc97bb8c81318 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; @@ -28,18 +29,19 @@ /** * Dumper common context. */ +@RequiredArgsConstructor @Getter @Setter @ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"}) public final class DumperCommonContext { - private String dataSourceName; + private final String dataSourceName; - private PipelineDataSourceConfiguration dataSourceConfig; + private final PipelineDataSourceConfiguration dataSourceConfig; - private ActualAndLogicTableNameMapper tableNameMapper; + private final ActualAndLogicTableNameMapper tableNameMapper; - private TableAndSchemaNameMapper tableAndSchemaNameMapper; + private final TableAndSchemaNameMapper tableAndSchemaNameMapper; private IngestPosition position; } diff --git a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java index 2e1d44438d7e8..b39cffbb8ac01 100644 --- a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java +++ b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java @@ -54,11 +54,8 @@ public final class InventoryDumperContext { private JobRateLimitAlgorithm rateLimitAlgorithm; public InventoryDumperContext(final DumperCommonContext commonContext) { - this.commonContext = new DumperCommonContext(); - this.commonContext.setDataSourceName(commonContext.getDataSourceName()); - this.commonContext.setDataSourceConfig(commonContext.getDataSourceConfig()); - this.commonContext.setTableNameMapper(commonContext.getTableNameMapper()); - this.commonContext.setTableAndSchemaNameMapper(commonContext.getTableAndSchemaNameMapper()); + this.commonContext = new DumperCommonContext( + commonContext.getDataSourceName(), commonContext.getDataSourceConfig(), commonContext.getTableNameMapper(), commonContext.getTableAndSchemaNameMapper()); } /** diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java index c83e3ede6d324..589c7513578ce 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java @@ -97,10 +97,10 @@ void setUp() throws SQLException { } private IncrementalDumperContext createDumperContext() { - DumperCommonContext commonContext = new DumperCommonContext(); - commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root")); - commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); - commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); + DumperCommonContext commonContext = new DumperCommonContext(null, + new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root"), + new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))), + new TableAndSchemaNameMapper(Collections.emptyMap())); return new IncrementalDumperContext(commonContext); } diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java index 4a033b76e6304..efa87a96409d1 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java @@ -107,10 +107,10 @@ private void createTable(final String jdbcUrl, final String username, final Stri } private IncrementalDumperContext createDumperContext(final String jdbcUrl, final String username, final String password) { - DumperCommonContext commonContext = new DumperCommonContext(); - commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password)); - commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order")))); - commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); + DumperCommonContext commonContext = new DumperCommonContext(null, + new StandardPipelineDataSourceConfiguration(jdbcUrl, username, password), + new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order_0"), new LogicTableName("t_order"))), + new TableAndSchemaNameMapper(Collections.emptyMap())); IncrementalDumperContext result = new IncrementalDumperContext(commonContext); result.setJobId("0101123456"); return result; diff --git a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java index 29c625de0a7d4..cdda6d2d11da7 100644 --- a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java +++ b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java @@ -86,10 +86,10 @@ void setUp() throws SQLException { } private IncrementalDumperContext mockDumperContext() { - DumperCommonContext commonContext = new DumperCommonContext(); - commonContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root")); - commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")))); - commonContext.setTableAndSchemaNameMapper(new TableAndSchemaNameMapper(Collections.emptyMap())); + DumperCommonContext commonContext = new DumperCommonContext(null, + new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL", "root", "root"), + new ActualAndLogicTableNameMapper(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order"))), + new TableAndSchemaNameMapper(Collections.emptyMap())); return new IncrementalDumperContext(commonContext); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index d25feef3a3b88..4232b335bf32c 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -279,16 +279,12 @@ public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguratio private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem); - Map tableNameMap = new LinkedHashMap<>(); - dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName(); StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName); - DumperCommonContext commonContext = new DumperCommonContext(); - commonContext.setDataSourceName(dataSourceName); - commonContext.setDataSourceConfig(actualDataSourceConfig); - commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); - commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); - IncrementalDumperContext result = new IncrementalDumperContext(commonContext); + Map tableNameMap = new LinkedHashMap<>(); + dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName())))); + IncrementalDumperContext result = new IncrementalDumperContext( + new DumperCommonContext(dataSourceName, actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap), tableAndSchemaNameMapper)); result.setJobId(jobConfig.getJobId()); result.setDecodeWithTX(jobConfig.isDecodeWithTX()); return result; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java index 764b22dec926d..f3819d82ff5c3 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java @@ -19,21 +19,16 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.DumperCommonContext; +import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper; import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper; -import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext; -import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName; -import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName; import org.apache.shardingsphere.data.pipeline.common.config.ingest.IncrementalDumperContextCreator; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; -import java.util.Map; - /** - * Migration incremental dumper configuration creator. + * Migration incremental dumper context creator. */ @RequiredArgsConstructor public final class MigrationIncrementalDumperContextCreator implements IncrementalDumperContextCreator { @@ -42,21 +37,11 @@ public final class MigrationIncrementalDumperContextCreator implements Increment @Override public IncrementalDumperContext createDumperContext(final JobDataNodeLine jobDataNodeLine) { - Map tableNameMap = JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine); - TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); String dataSourceName = jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName(); - return buildDumperContext(jobConfig.getJobId(), dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMap, tableAndSchemaNameMapper); - } - - private IncrementalDumperContext buildDumperContext(final String jobId, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSource, - final Map tableNameMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - DumperCommonContext commonContext = new DumperCommonContext(); - commonContext.setDataSourceName(dataSourceName); - commonContext.setDataSourceConfig(sourceDataSource); - commonContext.setTableNameMapper(new ActualAndLogicTableNameMapper(tableNameMap)); - commonContext.setTableAndSchemaNameMapper(tableAndSchemaNameMapper); - IncrementalDumperContext result = new IncrementalDumperContext(commonContext); - result.setJobId(jobId); + ActualAndLogicTableNameMapper tableNameMapper = new ActualAndLogicTableNameMapper(JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine)); + TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); + IncrementalDumperContext result = new IncrementalDumperContext(new DumperCommonContext(dataSourceName, jobConfig.getSources().get(dataSourceName), tableNameMapper, tableAndSchemaNameMapper)); + result.setJobId(jobConfig.getJobId()); return result; } }