From f070d1dc6283e0c4662766825008e0e28559d61f Mon Sep 17 00:00:00 2001 From: zhangliang Date: Fri, 1 Dec 2023 15:10:56 +0800 Subject: [PATCH] Remove TransmissionJobOption.buildTaskConfiguration() --- .../job/option/TransmissionJobOption.java | 12 ---- .../data/pipeline/cdc/CDCJob.java | 54 ++++++++++++++-- .../data/pipeline/cdc/CDCJobOption.java | 51 --------------- .../scenario/migration/MigrationJob.java | 63 ++++++++++++++++++- .../migration/MigrationJobOption.java | 63 ------------------- .../core/util/PipelineContextUtils.java | 55 +++++++++++++++- 6 files changed, 164 insertions(+), 134 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java index 2564468586f5e..3582b71ebe60c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java @@ -18,13 +18,11 @@ package org.apache.shardingsphere.data.pipeline.core.job.option; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; /** * Transmission job option. @@ -45,16 +43,6 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { */ PipelineJobInfo getJobInfo(String jobId); - /** - * Build task configuration. - * - * @param jobConfig pipeline job configuration - * @param jobShardingItem job sharding item - * @param processConfig pipeline process configuration - * @return task configuration - */ - PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration jobConfig, int jobShardingItem, PipelineProcessConfiguration processConfig); - /** * Build transmission process context. * diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java index 84cdcc78ed9e1..c70c81d012245 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java @@ -20,25 +20,38 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext; import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink; import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer; import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner; import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils; -import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; +import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; +import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback; import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine; import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; +import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; +import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; @@ -55,8 +68,11 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; /** * CDC job. @@ -93,7 +109,7 @@ public void execute(final ShardingContext shardingContext) { log.info("stopping true, ignore"); return; } - CDCJobItemContext jobItemContext = buildPipelineJobItemContext(jobConfig, shardingItem); + CDCJobItemContext jobItemContext = buildCDCJobItemContext(jobConfig, shardingItem); PipelineTasksRunner tasksRunner = new CDCTasksRunner(jobItemContext); if (!addTasksRunner(shardingItem, tasksRunner)) { continue; @@ -111,13 +127,43 @@ public void execute(final ShardingContext shardingContext) { executeIncrementalTasks(jobItemContexts); } - private CDCJobItemContext buildPipelineJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) { + private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) { Optional initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem); CDCProcessContext jobProcessContext = jobOption.buildProcessContext(jobConfig); - CDCTaskConfiguration taskConfig = jobOption.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); + CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink); } + private CDCTaskConfiguration buildTaskConfiguration(final CDCJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { + TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()); + IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, tableAndSchemaNameMapper); + ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper); + CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, importerConfig); + log.debug("buildTaskConfiguration, result={}", result); + return result; + } + + private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem); + String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName(); + StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName); + return new IncrementalDumperContext( + new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), tableAndSchemaNameMapper), + jobConfig.getJobId(), jobConfig.isDecodeWithTX()); + } + + private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection schemaTableNames, + final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(), + jobConfig.getDataSourceConfig().getParameter()); + CDCProcessContext processContext = new CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig); + JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); + int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); + Map> shardingColumnsMap = new ShardingColumnsExtractor() + .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet())); + return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1); + } + private void prepare(final Collection jobItemContexts) { try { jobPreparer.initTasks(jobItemContexts); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java index 081e86e82474e..331017ac3f3c3 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java @@ -18,39 +18,20 @@ package org.apache.shardingsphere.data.pipeline.cdc; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext; -import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; -import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; -import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; -import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory; -import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; -import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; -import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import java.util.Collection; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - /** * CDC job option. */ @@ -80,38 +61,6 @@ public PipelineJobInfo getJobInfo(final String jobId) { return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames())); } - @Override - public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { - CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig; - TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()); - IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, tableAndSchemaNameMapper); - ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper); - CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, importerConfig); - log.debug("buildTaskConfiguration, result={}", result); - return result; - } - - private IncrementalDumperContext buildDumperContext(final CDCJobConfiguration jobConfig, final int jobShardingItem, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem); - String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName(); - StandardPipelineDataSourceConfiguration actualDataSourceConfig = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName); - return new IncrementalDumperContext( - new DumperCommonContext(dataSourceName, actualDataSourceConfig, JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), tableAndSchemaNameMapper), - jobConfig.getJobId(), jobConfig.isDecodeWithTX()); - } - - private ImporterConfiguration buildImporterConfiguration(final CDCJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, final Collection schemaTableNames, - final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - PipelineDataSourceConfiguration dataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(), - jobConfig.getDataSourceConfig().getParameter()); - CDCProcessContext processContext = new CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig); - JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); - int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); - Map> shardingColumnsMap = new ShardingColumnsExtractor() - .getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(), schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet())); - return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1); - } - @Override public CDCProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) { TransmissionJobManager jobManager = new TransmissionJobManager(this); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index ccdadabf7f9cc..63fdd02f23d64 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -18,25 +18,46 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; +import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; +import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; import org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; +import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator; +import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; import org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.elasticjob.api.ShardingContext; +import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; +import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; +import org.apache.shardingsphere.infra.datanode.DataNode; import java.sql.SQLException; +import java.util.Collection; +import java.util.LinkedList; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; /** * Migration job. @@ -63,10 +84,48 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); Optional initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem); MigrationProcessContext jobProcessContext = jobOption.buildProcessContext(jobConfig); - MigrationTaskConfiguration taskConfig = jobOption.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); + MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager); } + private MigrationTaskConfiguration buildTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { + IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); + Collection createTableConfigs = buildCreateTableConfigurations(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); + Set targetTableNames = jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()); + Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( + ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); + ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); + MigrationTaskConfiguration result = new MigrationTaskConfiguration( + incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig); + log.info("buildTaskConfiguration, result={}", result); + return result; + } + + private Collection buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + Collection result = new LinkedList<>(); + for (JobDataNodeEntry each : jobConfig.getTablesFirstDataNodes().getEntries()) { + String sourceSchemaName = tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName()); + DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData(); + String targetSchemaName = dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null; + DataNode dataNode = each.getDataNodes().get(0); + PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSources().get(dataNode.getDataSourceName()); + CreateTableConfiguration createTableConfig = new CreateTableConfiguration(sourceDataSourceConfig, new CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()), + jobConfig.getTarget(), new CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName())); + result.add(createTableConfig); + } + log.info("buildCreateTableConfigurations, result={}", result); + return result; + } + + private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, + final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); + JobRateLimitAlgorithm writeRateLimitAlgorithm = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig).getWriteRateLimitAlgorithm(); + int retryTimes = jobConfig.getRetryTimes(); + int concurrency = jobConfig.getConcurrency(); + return new ImporterConfiguration(jobConfig.getTarget(), shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, retryTimes, concurrency); + } + @Override protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) { return new TransmissionTasksRunner((TransmissionJobItemContext) pipelineJobItemContext); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java index d2554aed14b57..03b8f537bf418 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java @@ -18,45 +18,26 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils; -import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; -import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; -import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; -import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; -import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; -import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; -import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; -import org.apache.shardingsphere.infra.datanode.DataNode; import java.util.Collection; import java.util.LinkedList; -import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; /** * Migration job option. @@ -94,50 +75,6 @@ public PipelineJobInfo getJobInfo(final String jobId) { return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables)); } - @Override - public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { - MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; - IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( - jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); - Collection createTableConfigs = buildCreateTableConfigurations(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); - Set targetTableNames = jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()); - Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( - ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); - ImporterConfiguration importerConfig = buildImporterConfiguration( - jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); - MigrationTaskConfiguration result = new MigrationTaskConfiguration( - incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig); - log.info("buildTaskConfiguration, result={}", result); - return result; - } - - private Collection buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - Collection result = new LinkedList<>(); - for (JobDataNodeEntry each : jobConfig.getTablesFirstDataNodes().getEntries()) { - String sourceSchemaName = tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName()); - DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData(); - String targetSchemaName = dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null; - DataNode dataNode = each.getDataNodes().get(0); - PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSources().get(dataNode.getDataSourceName()); - CreateTableConfiguration createTableConfig = new CreateTableConfiguration( - sourceDataSourceConfig, new CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()), - jobConfig.getTarget(), new CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName())); - result.add(createTableConfig); - } - log.info("buildCreateTableConfigurations, result={}", result); - return result; - } - - private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, - final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - MigrationProcessContext processContext = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig); - JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); - int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); - int retryTimes = jobConfig.getRetryTimes(); - int concurrency = jobConfig.getConcurrency(); - return new ImporterConfiguration(jobConfig.getTarget(), shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, retryTimes, concurrency); - } - @Override public MigrationProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) { PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java index 3faeb76ed4ab1..efa91b9cae21a 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java @@ -18,7 +18,10 @@ package org.apache.shardingsphere.test.it.data.pipeline.core.util; import lombok.SneakyThrows; +import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineProcessConfiguration; @@ -27,17 +30,27 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; +import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; +import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; +import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; +import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; import org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory; import org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource; import org.apache.shardingsphere.infra.config.mode.ModeConfiguration; +import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; +import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; +import org.apache.shardingsphere.infra.datanode.DataNode; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn; import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable; @@ -57,9 +70,13 @@ import java.sql.Types; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * Pipeline context utility class. @@ -166,7 +183,7 @@ public static MigrationJobItemContext mockMigrationJobItemContext(final Migratio PipelineProcessConfiguration processConfig = mockPipelineProcessConfiguration(); MigrationProcessContext processContext = new MigrationProcessContext(jobConfig.getJobId(), processConfig); int jobShardingItem = 0; - MigrationTaskConfiguration taskConfig = new MigrationJobOption().buildTaskConfiguration(jobConfig, jobShardingItem, processConfig); + MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, jobShardingItem, processConfig); return new MigrationJobItemContext(jobConfig, jobShardingItem, null, processContext, taskConfig, new DefaultPipelineDataSourceManager()); } @@ -178,4 +195,38 @@ private static PipelineProcessConfiguration mockPipelineProcessConfiguration() { PipelineProcessConfigurationUtils.fillInDefaultValue(yamlProcessConfig); return new YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig); } + + private static MigrationTaskConfiguration buildTaskConfiguration(final MigrationJobConfiguration jobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { + IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); + Collection createTableConfigs = buildCreateTableConfigurations(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); + Set targetTableNames = jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()); + Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( + ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); + ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); + return new MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig); + } + + private static Collection buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + Collection result = new LinkedList<>(); + for (JobDataNodeEntry each : jobConfig.getTablesFirstDataNodes().getEntries()) { + String sourceSchemaName = tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName()); + DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData(); + String targetSchemaName = dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null; + DataNode dataNode = each.getDataNodes().get(0); + PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSources().get(dataNode.getDataSourceName()); + CreateTableConfiguration createTableConfig = new CreateTableConfiguration(sourceDataSourceConfig, new CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()), + jobConfig.getTarget(), new CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName())); + result.add(createTableConfig); + } + return result; + } + + private static ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, + final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); + JobRateLimitAlgorithm writeRateLimitAlgorithm = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig).getWriteRateLimitAlgorithm(); + int retryTimes = jobConfig.getRetryTimes(); + int concurrency = jobConfig.getConcurrency(); + return new ImporterConfiguration(jobConfig.getTarget(), shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, retryTimes, concurrency); + } }