diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java index e307fe0f67959..78eecd175d1b6 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java @@ -26,8 +26,8 @@ import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener; import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader; @@ -55,7 +55,7 @@ public abstract class AbstractPipelineJob implements PipelineJob { private final String jobId; @Getter(AccessLevel.PROTECTED) - private final PipelineJobAPI jobAPI; + private final PipelineJobOption jobOption; private final AtomicBoolean stopping = new AtomicBoolean(false); @@ -65,7 +65,7 @@ public abstract class AbstractPipelineJob implements PipelineJob { protected AbstractPipelineJob(final String jobId) { this.jobId = jobId; - jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getType()); + jobOption = TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobId).getType()); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java index 91358b03a97ce..35651e55fb4c1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java @@ -61,7 +61,7 @@ public void execute(final ShardingContext shardingContext) { // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON - processFailed(new PipelineJobManager(getJobAPI()), jobId, shardingItem, ex); + processFailed(new PipelineJobManager(getJobOption()), jobId, shardingItem, ex); throw ex; } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java similarity index 96% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java index 5e4c86b125f52..55f0c235f1d2a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.core.job.service; +package org.apache.shardingsphere.data.pipeline.core.job.option; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; @@ -30,10 +30,10 @@ import java.util.Optional; /** - * Pipeline job API. + * Pipeline job option. */ @SingletonSPI -public interface PipelineJobAPI extends TypedSPI { +public interface PipelineJobOption extends TypedSPI { /** * Get YAML pipeline job configuration swapper. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java new file mode 100644 index 0000000000000..edf23ebc52563 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.job.option; + +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; + +/** + * Transmission job option. + */ +public interface TransmissionJobOption extends PipelineJobOption { + + @SuppressWarnings("unchecked") + @Override + default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { + return new YamlTransmissionJobItemProgressSwapper(); + } + + /** + * Get pipeline job info. + * + * @param jobId job ID + * @return pipeline job info + */ + PipelineJobInfo getJobInfo(String jobId); + + /** + * Build task configuration. + * + * @param jobConfig pipeline job configuration + * @param jobShardingItem job sharding item + * @param processConfig pipeline process configuration + * @return task configuration + */ + PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration jobConfig, int jobShardingItem, PipelineProcessConfiguration processConfig); + + /** + * Build transmission process context. + * + * @param jobConfig pipeline job configuration + * @return transmission process context + */ + TransmissionProcessContext buildProcessContext(PipelineJobConfiguration jobConfig); + + /** + * Extend YAML job configuration. + * + * @param contextKey context key + * @param yamlJobConfig YAML job configuration + */ + void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig); + + /** + * Build pipeline data consistency checker. + * + * @param jobConfig job configuration + * @param processContext process context + * @param progressContext consistency check job item progress context + * @return all logic tables check result + */ + PipelineDataConsistencyChecker buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, TransmissionProcessContext processContext, ConsistencyCheckJobItemProgressContext progressContext); +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java index 143ba94c50b6e..0f7bc16a222ce 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java @@ -23,7 +23,7 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -130,7 +130,7 @@ private static synchronized void persist(final String jobId, final int shardingI } persistContext.getHasNewEvents().set(false); long startTimeMillis = System.currentTimeMillis(); - new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobId).getType()) + new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobId).getType()) .getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get()); persistContext.getBeforePersistingProgressMillis().set(null); if (6 == ThreadLocalRandom.current().nextInt(100)) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java index 898c162eeaa2e..cf33f799c712d 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java @@ -21,6 +21,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.util.datetime.StandardDateTimeFormatter; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; @@ -34,7 +35,7 @@ @RequiredArgsConstructor public final class PipelineJobConfigurationManager { - private final PipelineJobAPI jobAPI; + private final PipelineJobOption jobOption; /** * Get job configuration. @@ -45,7 +46,7 @@ public final class PipelineJobConfigurationManager { */ @SuppressWarnings("unchecked") public T getJobConfiguration(final String jobId) { - return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter()); + return (T) jobOption.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter()); } /** @@ -57,9 +58,9 @@ public T getJobConfiguration(final String j public JobConfigurationPOJO convertToJobConfigurationPOJO(final PipelineJobConfiguration jobConfig) { JobConfigurationPOJO result = new JobConfigurationPOJO(); result.setJobName(jobConfig.getJobId()); - int shardingTotalCount = jobAPI.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount(); + int shardingTotalCount = jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount(); result.setShardingTotalCount(shardingTotalCount); - result.setJobParameter(YamlEngine.marshal(jobAPI.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig))); + result.setJobParameter(YamlEngine.marshal(jobOption.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig))); String createTimeFormat = LocalDateTime.now().format(StandardDateTimeFormatter.get()); result.getProps().setProperty("create_time", createTimeFormat); result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index 82325787f0e89..9d391d957c855 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -30,6 +30,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException; import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -49,7 +51,7 @@ @Slf4j public final class PipelineJobManager { - private final PipelineJobAPI jobAPI; + private final PipelineJobOption jobOption; /** * Start job. @@ -65,8 +67,8 @@ public Optional start(final PipelineJobConfiguration jobConfig) { log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId); return Optional.of(jobId); } - governanceFacade.getJobFacade().getJob().create(jobId, jobAPI.getJobClass()); - governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobAPI).convertToJobConfigurationPOJO(jobConfig)); + governanceFacade.getJobFacade().getJob().create(jobId, jobOption.getJobClass()); + governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig)); return Optional.of(jobId); } @@ -76,15 +78,15 @@ public Optional start(final PipelineJobConfiguration jobConfig) { * @param jobId job id */ public void startDisabledJob(final String jobId) { - if (jobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) { - Optional jobItemProgress = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()).getProgress(jobId, 0); + if (jobOption.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) { + Optional jobItemProgress = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()).getProgress(jobId, 0); if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) { log.info("job status is FINISHED, ignore, jobId={}", jobId); return; } } startCurrentDisabledJob(jobId); - jobAPI.getToBeStartDisabledNextJobType().ifPresent(optional -> startNextDisabledJob(jobId, optional)); + jobOption.getToBeStartDisabledNextJobType().ifPresent(optional -> startNextDisabledJob(jobId, optional)); } @@ -107,7 +109,7 @@ private void startCurrentDisabledJob(final String jobId) { private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) { PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> { try { - new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, toBeStartDisabledNextJobType)).startDisabledJob(optional); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, toBeStartDisabledNextJobType)).startDisabledJob(optional); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -122,14 +124,14 @@ private void startNextDisabledJob(final String jobId, final String toBeStartDisa * @param jobId job id */ public void stop(final String jobId) { - jobAPI.getToBeStoppedPreviousJobType().ifPresent(optional -> stopPreviousJob(jobId, optional)); + jobOption.getToBeStoppedPreviousJobType().ifPresent(optional -> stopPreviousJob(jobId, optional)); stopCurrentJob(jobId); } private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) { PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> { try { - new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, toBeStoppedPreviousJobType)).stop(optional); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, toBeStoppedPreviousJobType)).stop(optional); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -172,10 +174,10 @@ public void drop(final String jobId) { * @return jobs info */ public List getJobInfos(final PipelineContextKey contextKey) { - if (jobAPI instanceof TransmissionJobAPI) { + if (jobOption instanceof TransmissionJobAPI) { return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream() - .filter(each -> !each.getJobName().startsWith("_") && jobAPI.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType())) - .map(each -> ((TransmissionJobAPI) jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList()); + .filter(each -> !each.getJobName().startsWith("_") && jobOption.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType())) + .map(each -> ((TransmissionJobOption) jobOption).getJobInfo(each.getJobName())).collect(Collectors.toList()); } return Collections.emptyList(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java index 8a1def38509fb..b75f51bbc8d4a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java @@ -17,73 +17,16 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; -import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; +import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; +import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; import java.sql.SQLException; /** * Transmission job API. */ -public interface TransmissionJobAPI extends PipelineJobAPI { - - @SuppressWarnings("unchecked") - @Override - default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { - return new YamlTransmissionJobItemProgressSwapper(); - } - - /** - * Get pipeline job info. - * - * @param jobId job ID - * @return pipeline job info - */ - PipelineJobInfo getJobInfo(String jobId); - - /** - * Build task configuration. - * - * @param jobConfig pipeline job configuration - * @param jobShardingItem job sharding item - * @param processConfig pipeline process configuration - * @return task configuration - */ - PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration jobConfig, int jobShardingItem, PipelineProcessConfiguration processConfig); - - /** - * Build transmission process context. - * - * @param jobConfig pipeline job configuration - * @return transmission process context - */ - TransmissionProcessContext buildProcessContext(PipelineJobConfiguration jobConfig); - - /** - * Extend YAML job configuration. - * - * @param contextKey context key - * @param yamlJobConfig YAML job configuration - */ - void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig); - - /** - * Build pipeline data consistency checker. - * - * @param jobConfig job configuration - * @param processContext process context - * @param progressContext consistency check job item progress context - * @return all logic tables check result - */ - PipelineDataConsistencyChecker buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, TransmissionProcessContext processContext, ConsistencyCheckJobItemProgressContext progressContext); +@SingletonSPI +public interface TransmissionJobAPI extends TypedSPI { /** * Commit pipeline job. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java index e6b8e9803796c..7d1ceae8edc04 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; @@ -44,7 +45,7 @@ @RequiredArgsConstructor public final class TransmissionJobManager { - private final TransmissionJobAPI jobAPI; + private final TransmissionJobOption jobOption; private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); @@ -56,7 +57,7 @@ public final class TransmissionJobManager { */ public void alterProcessConfiguration(final PipelineContextKey contextKey, final PipelineProcessConfiguration processConfig) { // TODO check rateLimiter type match or not - processConfigPersistService.persist(contextKey, jobAPI.getType(), processConfig); + processConfigPersistService.persist(contextKey, jobOption.getType(), processConfig); } /** @@ -66,7 +67,7 @@ public void alterProcessConfiguration(final PipelineContextKey contextKey, final * @return process configuration, non-null */ public PipelineProcessConfiguration showProcessConfiguration(final PipelineContextKey contextKey) { - return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, jobAPI.getType())); + return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, jobOption.getType())); } /** @@ -76,11 +77,11 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte * @return job item infos */ public List getJobItemInfos(final String jobId) { - PipelineJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobAPI).getJobConfiguration(jobId); + PipelineJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId); long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0")); Map jobProgress = getJobProgress(jobConfig); List result = new LinkedList<>(); - PipelineJobInfo jobInfo = jobAPI.getJobInfo(jobId); + PipelineJobInfo jobInfo = jobOption.getJobInfo(jobId); for (Entry entry : jobProgress.entrySet()) { int shardingItem = entry.getKey(); TransmissionJobItemProgress jobItemProgress = entry.getValue(); @@ -107,7 +108,7 @@ public List getJobItemInfos(final String jobId) { * @return each sharding item progress */ public Map getJobProgress(final PipelineJobConfiguration jobConfig) { - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); String jobId = jobConfig.getJobId(); JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java index 9571b6c093412..6291279a01902 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java @@ -28,10 +28,10 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; @@ -56,7 +56,7 @@ public class TransmissionTasksRunner implements PipelineTasksRunner { private final Collection incrementalTasks; - private final PipelineJobAPI jobAPI; + private final PipelineJobOption jobOption; private final PipelineJobManager jobManager; @@ -66,9 +66,9 @@ public TransmissionTasksRunner(final TransmissionJobItemContext jobItemContext) this.jobItemContext = jobItemContext; inventoryTasks = jobItemContext.getInventoryTasks(); incrementalTasks = jobItemContext.getIncrementalTasks(); - jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()); - jobManager = new PipelineJobManager(jobAPI); - jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + jobOption = TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()); + jobManager = new PipelineJobManager(jobOption); + jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); } @Override @@ -89,7 +89,7 @@ public void start() { if (jobItemContext.isStopping()) { return; } - new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()) + new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()) .getYamlJobItemProgressSwapper()).persistProgress(jobItemContext); if (PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) { log.info("All inventory tasks finished."); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java index 287fb6fbde8cf..add405c928fc3 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java @@ -21,9 +21,9 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -41,8 +41,8 @@ public final class ShowStreamingJobStatusExecutor implements QueryableRALExecuto @Override public Collection getRows(final ShowStreamingStatusStatement sqlStatement) { - TransmissionJobAPI jobAPI = (TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getType()); - List jobItemInfos = new TransmissionJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId()); + TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, new CDCJobType().getType()); + List jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId()); long currentTimeMillis = System.currentTimeMillis(); return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList()); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java index 5e4404be248f2..0b88053358512 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.cdc.distsql.handler.query; import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement; -import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI; +import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; @@ -35,7 +35,7 @@ */ public final class ShowStreamingListExecutor implements QueryableRALExecutor { - private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new CDCJobAPI()); + private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new CDCJobOption()); @Override public Collection getRows(final ShowStreamingListStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java index a043e01e70470..535299f0da221 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java @@ -20,9 +20,9 @@ import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; @@ -40,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements QueryableRALExecutor getRows(final ShowStreamingRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING")) + PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, "STREAMING")) .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); Collection result = new LinkedList<>(); result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java index 47db9466af5ab..87a6a73050ee0 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/update/DropStreamingUpdater.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.cdc.distsql.handler.update; import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement; -import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI; +import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import java.sql.SQLException; @@ -28,7 +28,7 @@ */ public final class DropStreamingUpdater implements RALUpdater { - private final CDCJobAPI jobAPI = new CDCJobAPI(); + private final CDCJobOption jobAPI = new CDCJobOption(); @Override public void executeUpdate(final String databaseName, final DropStreamingStatement sqlStatement) throws SQLException { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java index c5b117fd9f7db..c681645f323b2 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.migration.distsql.handler.query; import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement; @@ -34,7 +34,7 @@ */ public final class ShowMigrationCheckStatusExecutor implements QueryableRALExecutor { - private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(); + private final ConsistencyCheckJobOption jobAPI = new ConsistencyCheckJobOption(); @Override public Collection getRows(final ShowMigrationCheckStatusStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java index 4a1ecdf49893d..bb146b4856a08 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java @@ -19,9 +19,9 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -40,8 +40,8 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto @Override public Collection getRows(final ShowMigrationStatusStatement sqlStatement) { - TransmissionJobAPI jobAPI = (TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"); - List jobItemInfos = new TransmissionJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId()); + TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, "MIGRATION"); + List jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId()); long currentTimeMillis = System.currentTimeMillis(); return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList()); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java index dfccb6f94053e..ae2d13c1c0817 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java @@ -21,9 +21,9 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter; -import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; +import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.distsql.segment.AlgorithmSegment; @@ -38,9 +38,9 @@ */ public final class CheckMigrationJobUpdater implements RALUpdater { - private final ConsistencyCheckJobAPI checkJobAPI = new ConsistencyCheckJobAPI(); + private final ConsistencyCheckJobOption checkJobAPI = new ConsistencyCheckJobOption(); - private final MigrationJobAPI migrationJobAPI = new MigrationJobAPI(); + private final MigrationJobOption migrationJobOption = new MigrationJobOption(); @Override public void executeUpdate(final String databaseName, final CheckMigrationStatement sqlStatement) throws SQLException { @@ -48,13 +48,13 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName(); Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps(); String jobId = sqlStatement.getJobId(); - MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(migrationJobAPI).getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(migrationJobOption).getJobConfiguration(jobId); verifyInventoryFinished(jobConfig); checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType())); } private void verifyInventoryFinished(final MigrationJobConfiguration jobConfig) { - TransmissionJobManager transmissionJobManager = new TransmissionJobManager(migrationJobAPI); + TransmissionJobManager transmissionJobManager = new TransmissionJobManager(migrationJobOption); ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(), transmissionJobManager.getJobProgress(jobConfig).values()), () -> new PipelineInvalidParameterException("Inventory is not finished.")); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java index f8bea1788cfc6..147bf6f156265 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CommitMigrationUpdater.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.migration.distsql.handler.update; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement; @@ -32,7 +31,7 @@ public final class CommitMigrationUpdater implements RALUpdater { - private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(); + private final ConsistencyCheckJobOption jobAPI = new ConsistencyCheckJobOption(); @Override public void executeUpdate(final String databaseName, final DropMigrationCheckStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RollbackMigrationUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RollbackMigrationUpdater.java index 284075b05731c..44fc9ff7d967a 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RollbackMigrationUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RollbackMigrationUpdater.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.migration.distsql.handler.update; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.migration.distsql.statement.RollbackMigrationStatement; @@ -32,7 +31,7 @@ public final class RollbackMigrationUpdater implements RALUpdater { - private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(); + private final ConsistencyCheckJobOption jobAPI = new ConsistencyCheckJobOption(); @Override public void executeUpdate(final String databaseName, final StartMigrationCheckStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationCheckUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationCheckUpdater.java index 786f0fce42993..d3840b3d2f03b 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationCheckUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationCheckUpdater.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.migration.distsql.handler.update; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckStatement; @@ -26,7 +26,7 @@ */ public final class StopMigrationCheckUpdater implements RALUpdater { - private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(); + private final ConsistencyCheckJobOption jobAPI = new ConsistencyCheckJobOption(); @Override public void executeUpdate(final String databaseName, final StopMigrationCheckStatement sqlStatement) { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java similarity index 98% rename from kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java rename to kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java index 06ac2e6bbd688..af0c219b7ef7d 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobOption.java @@ -66,11 +66,11 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; @@ -97,10 +97,10 @@ import java.util.stream.Collectors; /** - * CDC job API. + * CDC job option. */ @Slf4j -public final class CDCJobAPI implements TransmissionJobAPI { +public final class CDCJobOption implements TransmissionJobOption { private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); @@ -302,10 +302,6 @@ public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() { return true; } - @Override - public void commit(final String jobId) { - } - /** * Drop streaming job. * @@ -328,10 +324,6 @@ private void cleanup(final CDCJobConfiguration jobConfig) { } } - @Override - public void rollback(final String jobId) throws SQLException { - } - @Override public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext, final ConsistencyCheckJobItemProgressContext progressContext) { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java index 78d0cd9e4b233..79d53ea16f64d 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI; +import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext; @@ -65,7 +65,7 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob { @Getter private final PipelineSink sink; - private final CDCJobAPI jobAPI = new CDCJobAPI(); + private final CDCJobOption jobAPI = new CDCJobOption(); private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index ad9a1942b582b..c182ac2746290 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.prepare; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI; +import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption; import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext; @@ -67,7 +67,7 @@ @Slf4j public final class CDCJobPreparer { - private final CDCJobAPI jobAPI = new CDCJobAPI(); + private final CDCJobOption jobAPI = new CDCJobOption(); private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index cc0faf0536cc7..849ab9a61c074 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -21,7 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelId; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI; +import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption; import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; @@ -70,7 +70,7 @@ @Slf4j public final class CDCBackendHandler { - private final CDCJobAPI jobAPI = new CDCJobAPI(); + private final CDCJobOption jobAPI = new CDCJobOption(); private final PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(jobAPI); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption similarity index 92% rename from kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI rename to kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption index b500cdc9f5837..136bec61f328d 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption @@ -15,4 +15,4 @@ # limitations under the License. # -org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI +org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java index 8932efa661360..f3056e72746a1 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java @@ -45,7 +45,7 @@ public ConsistencyCheckJob(final String jobId) { @Override public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) { ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper()); + PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getJobOption().getYamlJobItemProgressSwapper()); Optional jobItemProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingContext.getShardingItem()); return new ConsistencyCheckJobItemContext(jobConfig, shardingContext.getShardingItem(), JobStatus.RUNNING, jobItemProgress.orElse(null)); } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java similarity index 98% rename from kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java rename to kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java index c02766b6033b7..afd2c577040c0 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java @@ -31,8 +31,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; @@ -63,10 +63,10 @@ import java.util.stream.Collectors; /** - * Consistency check job API. + * Consistency check job option. */ @Slf4j -public final class ConsistencyCheckJobAPI implements PipelineJobAPI { +public final class ConsistencyCheckJobOption implements PipelineJobOption { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 291ce0108df4b..e5dc6c5ccc3ad 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -30,14 +30,14 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -53,7 +53,7 @@ @Slf4j public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner { - private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(); + private final ConsistencyCheckJobOption jobAPI = new ConsistencyCheckJobOption(); private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI); @@ -85,7 +85,7 @@ public void start() { if (jobItemContext.isStopping()) { return; } - new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()) + new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()) .getYamlJobItemProgressSwapper()).persistProgress(jobItemContext); CompletableFuture future = jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor); ExecuteEngine.trigger(Collections.singletonList(future), new CheckExecuteCallback()); @@ -103,11 +103,11 @@ private final class CheckPipelineLifecycleRunnable extends AbstractPipelineLifec protected void runBlocking() { jobItemManager.persistProgress(jobItemContext); JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); - TransmissionJobAPI jobAPI = (TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType()); - PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobAPI).getJobConfiguration(parentJobId); + TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, jobType.getType()); + PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId); try { - PipelineDataConsistencyChecker checker = jobAPI.buildDataConsistencyChecker( - parentJobConfig, jobAPI.buildProcessContext(parentJobConfig), jobItemContext.getProgressContext()); + PipelineDataConsistencyChecker checker = jobOption.buildDataConsistencyChecker( + parentJobConfig, jobOption.buildProcessContext(parentJobConfig), jobItemContext.getProgressContext()); consistencyChecker.set(checker); Map checkResultMap = checker.check(checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps()); log.info("job {} with check algorithm '{}' data consistency checker result: {}, stopping: {}", diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption similarity index 95% rename from kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI rename to kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption index 64e5c114f6a59..31edf286e0753 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption @@ -15,4 +15,4 @@ # limitations under the License. # -org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI +org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index 3e46a817d49bb..32ec35bdab8f4 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -18,16 +18,16 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; -import org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; -import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; +import org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner; +import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; @@ -45,9 +45,9 @@ @Slf4j public final class MigrationJob extends AbstractSimplePipelineJob { - private final MigrationJobAPI jobAPI = new MigrationJobAPI(); + private final MigrationJobOption jobOption = new MigrationJobOption(); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); @@ -63,8 +63,8 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC int shardingItem = shardingContext.getShardingItem(); MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); Optional initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem); - MigrationProcessContext jobProcessContext = jobAPI.buildProcessContext(jobConfig); - MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); + MigrationProcessContext jobProcessContext = jobOption.buildProcessContext(jobConfig); + MigrationTaskConfiguration taskConfig = jobOption.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 8b1918c218a1a..649e0c994fad4 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -18,93 +18,26 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; -import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils; -import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; -import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; -import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; -import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; -import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; -import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; -import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; -import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException; -import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException; -import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException; -import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; -import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; -import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator; -import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; -import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration; -import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper; -import org.apache.shardingsphere.infra.config.rule.RuleConfiguration; -import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties; -import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser; -import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; -import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; -import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory; -import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; -import org.apache.shardingsphere.infra.datanode.DataNode; -import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; -import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; -import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit; -import org.apache.shardingsphere.infra.util.json.JsonUtils; -import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration; -import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; -import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine; -import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement; -import org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry; +import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.mode.manager.ContextManager; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; -import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; /** * Migration job API. @@ -112,190 +45,24 @@ @Slf4j public final class MigrationJobAPI implements TransmissionJobAPI { - private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService(); - - /** - * Create job migration config and start. - * - * @param contextKey context key - * @param param create migration job parameter - * @return job id - */ - public String createJobAndStart(final PipelineContextKey contextKey, final MigrateTableStatement param) { - MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param)); - new PipelineJobManager(this).start(jobConfig); - return jobConfig.getJobId(); - } - - private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) { - YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration(); - result.setTargetDatabaseName(param.getTargetDatabaseName()); - Map metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION"); - Map> sourceDataNodes = new LinkedHashMap<>(); - Map configSources = new LinkedHashMap<>(); - List sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName) - .thenComparing(each -> DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList()); - YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); - for (SourceTargetEntry each : sourceTargetEntries) { - sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource()); - ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(), - () -> new PipelineInvalidParameterException("more than one source table for " + each.getTargetTableName())); - String dataSourceName = each.getSource().getDataSourceName(); - if (configSources.containsKey(dataSourceName)) { - continue; - } - ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName), - () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it.")); - Map sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName)); - StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps); - configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter())); - DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData(); - if (null == each.getSource().getSchemaName() && dialectDatabaseMetaData.isSchemaAvailable()) { - each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig)); - } - DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType(); - if (null == result.getSourceDatabaseType()) { - result.setSourceDatabaseType(sourceDatabaseType.getType()); - } else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) { - throw new PipelineInvalidParameterException("Source storage units have different database types"); - } - } - result.setSources(configSources); - ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName()); - PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase); - result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter())); - result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType()); - List tablesFirstDataNodes = sourceDataNodes.entrySet().stream() - .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList()); - result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList())); - result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes)); - result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal()); - result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList())); - extendYamlJobConfiguration(contextKey, result); - return result; - } - - private YamlPipelineDataSourceConfiguration buildYamlPipelineDataSourceConfiguration(final String type, final String param) { - YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration(); - result.setType(type); - result.setParameter(param); - return result; - } - - private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) { - Map> targetPoolProps = new HashMap<>(); - YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); - for (Entry entry : targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) { - targetPoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties())); - } - YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, targetDatabase.getRuleMetaData().getConfigurations()); - return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig); - } - - private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map> yamlDataSources, final Collection rules) { - if (rules.isEmpty()) { - throw new NoAnyRuleExistsException(databaseName); - } - YamlRootConfiguration result = new YamlRootConfiguration(); - result.setDatabaseName(databaseName); - result.setDataSources(yamlDataSources); - result.setRules(new YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules)); - return result; - } - - private Map buildTargetTableSchemaMap(final Map> sourceDataNodes) { - Map result = new LinkedHashMap<>(); - sourceDataNodes.forEach((tableName, dataNodes) -> result.put(tableName, dataNodes.get(0).getSchemaName())); - return result; - } - - @Override - public PipelineJobInfo getJobInfo(final String jobId) { - PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); - List sourceTables = new LinkedList<>(); - new PipelineJobConfigurationManager(this).getJobConfiguration(jobId).getJobShardingDataNodes() - .forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); - return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables)); - } - - @Override - public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) { - YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) yamlJobConfig; - if (null == yamlJobConfig.getJobId()) { - config.setJobId(new MigrationJobId(contextKey, config.getJobShardingDataNodes()).marshal()); - } - } - - @SuppressWarnings("unchecked") - @Override - public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() { - return new YamlMigrationJobConfigurationSwapper(); - } - - @Override - public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { - MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; - IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( - jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); - Collection createTableConfigs = buildCreateTableConfigurations(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); - Set targetTableNames = jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()); - Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( - ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); - ImporterConfiguration importerConfig = buildImporterConfiguration( - jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); - MigrationTaskConfiguration result = new MigrationTaskConfiguration( - incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig); - log.info("buildTaskConfiguration, result={}", result); - return result; - } - - private Collection buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - Collection result = new LinkedList<>(); - for (JobDataNodeEntry each : jobConfig.getTablesFirstDataNodes().getEntries()) { - String sourceSchemaName = tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName()); - DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData(); - String targetSchemaName = dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null; - DataNode dataNode = each.getDataNodes().get(0); - PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSources().get(dataNode.getDataSourceName()); - CreateTableConfiguration createTableConfig = new CreateTableConfiguration( - sourceDataSourceConfig, new CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()), - jobConfig.getTarget(), new CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName())); - result.add(createTableConfig); - } - log.info("buildCreateTableConfigurations, result={}", result); - return result; - } - - private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, - final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { - MigrationProcessContext processContext = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig); - JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); - int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); - int retryTimes = jobConfig.getRetryTimes(); - int concurrency = jobConfig.getConcurrency(); - return new ImporterConfiguration(jobConfig.getTarget(), shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, retryTimes, concurrency); - } - - @Override - public MigrationProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) { - PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); - return new MigrationProcessContext(jobConfig.getJobId(), processConfig); - } - - @Override - public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext, - final ConsistencyCheckJobItemProgressContext progressContext) { - return new MigrationDataConsistencyChecker((MigrationJobConfiguration) jobConfig, processContext, progressContext); - } - @Override - public Optional getToBeStartDisabledNextJobType() { - return Optional.of("CONSISTENCY_CHECK"); + public void commit(final String jobId) { + log.info("Commit job {}", jobId); + final long startTimeMillis = System.currentTimeMillis(); + PipelineJobManager jobManager = new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, getType())); + jobManager.stop(jobId); + dropCheckJobs(jobId); + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobOption.class, getType())).getJobConfiguration(jobId); + refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName()); + jobManager.drop(jobId); + log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis); } - @Override - public Optional getToBeStoppedPreviousJobType() { - return Optional.of("CONSISTENCY_CHECK"); + private void refreshTableMetadata(final String jobId, final String databaseName) { + // TODO use origin database name now, wait reloadDatabaseMetaData fix case-sensitive probelm + ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager(); + ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName); + contextManager.reloadDatabaseMetaData(database.getName()); } @Override @@ -303,7 +70,7 @@ public void rollback(final String jobId) throws SQLException { final long startTimeMillis = System.currentTimeMillis(); dropCheckJobs(jobId); cleanTempTableOnRollback(jobId); - new PipelineJobManager(this).drop(jobId); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, getType())).drop(jobId); log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis); } @@ -314,7 +81,7 @@ private void dropCheckJobs(final String jobId) { } for (String each : checkJobIds) { try { - new PipelineJobManager(this).drop(each); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, getType())).drop(each); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -324,7 +91,7 @@ private void dropCheckJobs(final String jobId) { } private void cleanTempTableOnRollback(final String jobId) throws SQLException { - MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobOption.class, getType())).getJobConfiguration(jobId); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); try ( @@ -341,115 +108,6 @@ private void cleanTempTableOnRollback(final String jobId) throws SQLException { } } - @Override - public void commit(final String jobId) { - log.info("Commit job {}", jobId); - final long startTimeMillis = System.currentTimeMillis(); - PipelineJobManager jobManager = new PipelineJobManager(this); - jobManager.stop(jobId); - dropCheckJobs(jobId); - MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId); - refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName()); - jobManager.drop(jobId); - log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis); - } - - /** - * Add migration source resources. - * - * @param contextKey context key - * @param propsMap data source pool properties map - */ - public void addMigrationSourceResources(final PipelineContextKey contextKey, final Map propsMap) { - Map existDataSources = dataSourcePersistService.load(contextKey, getType()); - Collection duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F); - for (Entry entry : propsMap.entrySet()) { - if (existDataSources.containsKey(entry.getKey())) { - duplicateDataSourceNames.add(entry.getKey()); - } - } - ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () -> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames)); - Map result = new LinkedHashMap<>(existDataSources); - result.putAll(propsMap); - dataSourcePersistService.persist(contextKey, getType(), result); - } - - /** - * Drop migration source resources. - * - * @param contextKey context key - * @param resourceNames resource names - */ - public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection resourceNames) { - Map metaDataDataSource = dataSourcePersistService.load(contextKey, getType()); - List noExistResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList()); - ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () -> new UnregisterMigrationSourceStorageUnitException(noExistResources)); - for (String each : resourceNames) { - metaDataDataSource.remove(each); - } - dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource); - } - - /** - * Query migration source resources list. - * - * @param contextKey context key - * @return migration source resources - */ - public Collection> listMigrationSourceResources(final PipelineContextKey contextKey) { - Map propsMap = dataSourcePersistService.load(contextKey, getType()); - Collection> result = new ArrayList<>(propsMap.size()); - for (Entry entry : propsMap.entrySet()) { - String dataSourceName = entry.getKey(); - DataSourcePoolProperties value = entry.getValue(); - Collection props = new LinkedList<>(); - props.add(dataSourceName); - String url = String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url")); - DatabaseType databaseType = DatabaseTypeFactory.get(url); - props.add(databaseType.getType()); - ConnectionProperties connectionProps = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, databaseType).parse(url, "", null); - props.add(connectionProps.getHostname()); - props.add(connectionProps.getPort()); - props.add(connectionProps.getCatalog()); - Map standardProps = value.getPoolPropertySynonyms().getStandardProperties(); - props.add(getStandardProperty(standardProps, "connectionTimeoutMilliseconds")); - props.add(getStandardProperty(standardProps, "idleTimeoutMilliseconds")); - props.add(getStandardProperty(standardProps, "maxLifetimeMilliseconds")); - props.add(getStandardProperty(standardProps, "maxPoolSize")); - props.add(getStandardProperty(standardProps, "minPoolSize")); - props.add(getStandardProperty(standardProps, "readOnly")); - Map otherProps = value.getCustomProperties().getProperties(); - props.add(otherProps.isEmpty() ? "" : JsonUtils.toJsonString(otherProps)); - result.add(props); - } - return result; - } - - private String getStandardProperty(final Map standardProps, final String key) { - if (standardProps.containsKey(key) && null != standardProps.get(key)) { - return standardProps.get(key).toString(); - } - return ""; - } - - /** - * Refresh table metadata. - * - * @param jobId job id - * @param databaseName database name - */ - public void refreshTableMetadata(final String jobId, final String databaseName) { - // TODO use origin database name now, wait reloadDatabaseMetaData fix case-sensitive probelm - ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager(); - ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName); - contextManager.reloadDatabaseMetaData(database.getName()); - } - - @Override - public Class getJobClass() { - return MigrationJob.class; - } - @Override public String getType() { return "MIGRATION"; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java new file mode 100644 index 0000000000000..6e820b4af1f8c --- /dev/null +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; +import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils; +import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; +import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; +import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; +import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; +import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; +import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; +import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; +import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; +import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; +import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException; +import org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException; +import org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException; +import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; +import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; +import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; +import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; +import org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator; +import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext; +import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration; +import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper; +import org.apache.shardingsphere.infra.config.rule.RuleConfiguration; +import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties; +import org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser; +import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; +import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader; +import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory; +import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; +import org.apache.shardingsphere.infra.datanode.DataNode; +import org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties; +import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; +import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; +import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit; +import org.apache.shardingsphere.infra.util.json.JsonUtils; +import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration; +import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; +import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine; +import org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement; +import org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry; +import org.apache.shardingsphere.mode.manager.ContextManager; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Migration job option. + */ +@Slf4j +public final class MigrationJobOption implements TransmissionJobOption { + + private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService(); + + /** + * Create job migration config and start. + * + * @param contextKey context key + * @param param create migration job parameter + * @return job id + */ + public String createJobAndStart(final PipelineContextKey contextKey, final MigrateTableStatement param) { + MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey, param)); + new PipelineJobManager(this).start(jobConfig); + return jobConfig.getJobId(); + } + + private YamlMigrationJobConfiguration buildYamlJobConfiguration(final PipelineContextKey contextKey, final MigrateTableStatement param) { + YamlMigrationJobConfiguration result = new YamlMigrationJobConfiguration(); + result.setTargetDatabaseName(param.getTargetDatabaseName()); + Map metaDataDataSource = dataSourcePersistService.load(contextKey, "MIGRATION"); + Map> sourceDataNodes = new LinkedHashMap<>(); + Map configSources = new LinkedHashMap<>(); + List sourceTargetEntries = new ArrayList<>(new HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName) + .thenComparing(each -> DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList()); + YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); + for (SourceTargetEntry each : sourceTargetEntries) { + sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key -> new LinkedList<>()).add(each.getSource()); + ShardingSpherePreconditions.checkState(1 == sourceDataNodes.get(each.getTargetTableName()).size(), + () -> new PipelineInvalidParameterException("more than one source table for " + each.getTargetTableName())); + String dataSourceName = each.getSource().getDataSourceName(); + if (configSources.containsKey(dataSourceName)) { + continue; + } + ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName), + () -> new PipelineInvalidParameterException(dataSourceName + " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it.")); + Map sourceDataSourcePoolProps = dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName)); + StandardPipelineDataSourceConfiguration sourceDataSourceConfig = new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps); + configSources.put(dataSourceName, buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(), sourceDataSourceConfig.getParameter())); + DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData(); + if (null == each.getSource().getSchemaName() && dialectDatabaseMetaData.isSchemaAvailable()) { + each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig)); + } + DatabaseType sourceDatabaseType = sourceDataSourceConfig.getDatabaseType(); + if (null == result.getSourceDatabaseType()) { + result.setSourceDatabaseType(sourceDatabaseType.getType()); + } else if (!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) { + throw new PipelineInvalidParameterException("Source storage units have different database types"); + } + } + result.setSources(configSources); + ShardingSphereDatabase targetDatabase = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName()); + PipelineDataSourceConfiguration targetPipelineDataSourceConfig = buildTargetPipelineDataSourceConfiguration(targetDatabase); + result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(), targetPipelineDataSourceConfig.getParameter())); + result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType()); + List tablesFirstDataNodes = sourceDataNodes.entrySet().stream() + .map(entry -> new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1))).collect(Collectors.toList()); + result.setTargetTableNames(new ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList())); + result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes)); + result.setTablesFirstDataNodes(new JobDataNodeLine(tablesFirstDataNodes).marshal()); + result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList())); + extendYamlJobConfiguration(contextKey, result); + return result; + } + + private YamlPipelineDataSourceConfiguration buildYamlPipelineDataSourceConfiguration(final String type, final String param) { + YamlPipelineDataSourceConfiguration result = new YamlPipelineDataSourceConfiguration(); + result.setType(type); + result.setParameter(param); + return result; + } + + private PipelineDataSourceConfiguration buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase targetDatabase) { + Map> targetPoolProps = new HashMap<>(); + YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); + for (Entry entry : targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) { + targetPoolProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties())); + } + YamlRootConfiguration targetRootConfig = buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps, targetDatabase.getRuleMetaData().getConfigurations()); + return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig); + } + + private YamlRootConfiguration buildYamlRootConfiguration(final String databaseName, final Map> yamlDataSources, final Collection rules) { + if (rules.isEmpty()) { + throw new NoAnyRuleExistsException(databaseName); + } + YamlRootConfiguration result = new YamlRootConfiguration(); + result.setDatabaseName(databaseName); + result.setDataSources(yamlDataSources); + result.setRules(new YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules)); + return result; + } + + private Map buildTargetTableSchemaMap(final Map> sourceDataNodes) { + Map result = new LinkedHashMap<>(); + sourceDataNodes.forEach((tableName, dataNodes) -> result.put(tableName, dataNodes.get(0).getSchemaName())); + return result; + } + + @Override + public PipelineJobInfo getJobInfo(final String jobId) { + PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + List sourceTables = new LinkedList<>(); + new PipelineJobConfigurationManager(this).getJobConfiguration(jobId).getJobShardingDataNodes() + .forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); + return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables)); + } + + @Override + public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) { + YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) yamlJobConfig; + if (null == yamlJobConfig.getJobId()) { + config.setJobId(new MigrationJobId(contextKey, config.getJobShardingDataNodes()).marshal()); + } + } + + @SuppressWarnings("unchecked") + @Override + public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() { + return new YamlMigrationJobConfigurationSwapper(); + } + + @Override + public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { + MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; + IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( + jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); + Collection createTableConfigs = buildCreateTableConfigurations(jobConfig, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); + Set targetTableNames = jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()); + Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( + ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); + ImporterConfiguration importerConfig = buildImporterConfiguration( + jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); + MigrationTaskConfiguration result = new MigrationTaskConfiguration( + incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig); + log.info("buildTaskConfiguration, result={}", result); + return result; + } + + private Collection buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + Collection result = new LinkedList<>(); + for (JobDataNodeEntry each : jobConfig.getTablesFirstDataNodes().getEntries()) { + String sourceSchemaName = tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName()); + DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData(); + String targetSchemaName = dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null; + DataNode dataNode = each.getDataNodes().get(0); + PipelineDataSourceConfiguration sourceDataSourceConfig = jobConfig.getSources().get(dataNode.getDataSourceName()); + CreateTableConfiguration createTableConfig = new CreateTableConfiguration( + sourceDataSourceConfig, new CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()), + jobConfig.getTarget(), new CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName())); + result.add(createTableConfig); + } + log.info("buildCreateTableConfigurations, result={}", result); + return result; + } + + private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration pipelineProcessConfig, + final Map> shardingColumnsMap, final TableAndSchemaNameMapper tableAndSchemaNameMapper) { + MigrationProcessContext processContext = new MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig); + JobRateLimitAlgorithm writeRateLimitAlgorithm = processContext.getWriteRateLimitAlgorithm(); + int batchSize = pipelineProcessConfig.getWrite().getBatchSize(); + int retryTimes = jobConfig.getRetryTimes(); + int concurrency = jobConfig.getConcurrency(); + return new ImporterConfiguration(jobConfig.getTarget(), shardingColumnsMap, tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, retryTimes, concurrency); + } + + @Override + public MigrationProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) { + PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); + return new MigrationProcessContext(jobConfig.getJobId(), processConfig); + } + + @Override + public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext, + final ConsistencyCheckJobItemProgressContext progressContext) { + return new MigrationDataConsistencyChecker((MigrationJobConfiguration) jobConfig, processContext, progressContext); + } + + @Override + public Optional getToBeStartDisabledNextJobType() { + return Optional.of("CONSISTENCY_CHECK"); + } + + @Override + public Optional getToBeStoppedPreviousJobType() { + return Optional.of("CONSISTENCY_CHECK"); + } + + private void dropCheckJobs(final String jobId) { + Collection checkJobIds = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().listCheckJobIds(jobId); + if (checkJobIds.isEmpty()) { + return; + } + for (String each : checkJobIds) { + try { + new PipelineJobManager(this).drop(each); + // CHECKSTYLE:OFF + } catch (final RuntimeException ex) { + // CHECKSTYLE:ON + log.info("drop check job failed, check job id: {}, error: {}", each, ex.getMessage()); + } + } + } + + private void cleanTempTableOnRollback(final String jobId) throws SQLException { + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId); + PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); + TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); + try ( + PipelineDataSourceWrapper dataSource = PipelineDataSourceFactory.newInstance(jobConfig.getTarget()); + Connection connection = dataSource.getConnection()) { + for (String each : jobConfig.getTargetTableNames()) { + String targetSchemaName = mapping.getSchemaName(each); + String sql = pipelineSQLBuilder.buildDropSQL(targetSchemaName, each); + log.info("cleanTempTableOnRollback, targetSchemaName={}, targetTableName={}, sql={}", targetSchemaName, each, sql); + try (Statement statement = connection.createStatement()) { + statement.execute(sql); + } + } + } + } + + /** + * Add migration source resources. + * + * @param contextKey context key + * @param propsMap data source pool properties map + */ + public void addMigrationSourceResources(final PipelineContextKey contextKey, final Map propsMap) { + Map existDataSources = dataSourcePersistService.load(contextKey, getType()); + Collection duplicateDataSourceNames = new HashSet<>(propsMap.size(), 1F); + for (Entry entry : propsMap.entrySet()) { + if (existDataSources.containsKey(entry.getKey())) { + duplicateDataSourceNames.add(entry.getKey()); + } + } + ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () -> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames)); + Map result = new LinkedHashMap<>(existDataSources); + result.putAll(propsMap); + dataSourcePersistService.persist(contextKey, getType(), result); + } + + /** + * Drop migration source resources. + * + * @param contextKey context key + * @param resourceNames resource names + */ + public void dropMigrationSourceResources(final PipelineContextKey contextKey, final Collection resourceNames) { + Map metaDataDataSource = dataSourcePersistService.load(contextKey, getType()); + List noExistResources = resourceNames.stream().filter(each -> !metaDataDataSource.containsKey(each)).collect(Collectors.toList()); + ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () -> new UnregisterMigrationSourceStorageUnitException(noExistResources)); + for (String each : resourceNames) { + metaDataDataSource.remove(each); + } + dataSourcePersistService.persist(contextKey, getType(), metaDataDataSource); + } + + /** + * Query migration source resources list. + * + * @param contextKey context key + * @return migration source resources + */ + public Collection> listMigrationSourceResources(final PipelineContextKey contextKey) { + Map propsMap = dataSourcePersistService.load(contextKey, getType()); + Collection> result = new ArrayList<>(propsMap.size()); + for (Entry entry : propsMap.entrySet()) { + String dataSourceName = entry.getKey(); + DataSourcePoolProperties value = entry.getValue(); + Collection props = new LinkedList<>(); + props.add(dataSourceName); + String url = String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url")); + DatabaseType databaseType = DatabaseTypeFactory.get(url); + props.add(databaseType.getType()); + ConnectionProperties connectionProps = DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class, databaseType).parse(url, "", null); + props.add(connectionProps.getHostname()); + props.add(connectionProps.getPort()); + props.add(connectionProps.getCatalog()); + Map standardProps = value.getPoolPropertySynonyms().getStandardProperties(); + props.add(getStandardProperty(standardProps, "connectionTimeoutMilliseconds")); + props.add(getStandardProperty(standardProps, "idleTimeoutMilliseconds")); + props.add(getStandardProperty(standardProps, "maxLifetimeMilliseconds")); + props.add(getStandardProperty(standardProps, "maxPoolSize")); + props.add(getStandardProperty(standardProps, "minPoolSize")); + props.add(getStandardProperty(standardProps, "readOnly")); + Map otherProps = value.getCustomProperties().getProperties(); + props.add(otherProps.isEmpty() ? "" : JsonUtils.toJsonString(otherProps)); + result.add(props); + } + return result; + } + + private String getStandardProperty(final Map standardProps, final String key) { + if (standardProps.containsKey(key) && null != standardProps.get(key)) { + return standardProps.get(key).toString(); + } + return ""; + } + + /** + * Refresh table metadata. + * + * @param jobId job id + * @param databaseName database name + */ + public void refreshTableMetadata(final String jobId, final String databaseName) { + // TODO use origin database name now, wait reloadDatabaseMetaData fix case-sensitive probelm + ContextManager contextManager = PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager(); + ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName); + contextManager.reloadDatabaseMetaData(database.getName()); + } + + @Override + public Class getJobClass() { + return MigrationJob.class; + } + + @Override + public String getType() { + return "MIGRATION"; + } +} diff --git a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption new file mode 100644 index 0000000000000..c89d6bd11b8be --- /dev/null +++ b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption diff --git a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI similarity index 100% rename from kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI rename to kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java index 0f3179f15644e..1bf9b0046eddf 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java @@ -19,9 +19,9 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.distsql.statement.ral.queryable.ShowMigrationRuleStatement; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; @@ -40,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements QueryableRALExecutor getRows(final ShowMigrationRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")) + PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, "MIGRATION")) .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); Collection result = new LinkedList<>(); result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java index 985278f2f603f..210da1a21d6fd 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java @@ -19,9 +19,9 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterTransmissionRuleStatement; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; @@ -35,7 +35,7 @@ public final class AlterTransmissionRuleUpdater implements RALUpdater jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index e38bf513ad4a6..55e7c4be872e5 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -39,6 +39,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; +import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator; @@ -89,6 +90,8 @@ @StaticMockSettings(PipelineDistributedBarrier.class) class MigrationJobAPITest { + private static MigrationJobOption jobOption; + private static MigrationJobAPI jobAPI; private static PipelineJobConfigurationManager jobConfigManager; @@ -104,23 +107,24 @@ class MigrationJobAPITest { @BeforeAll static void beforeClass() { PipelineContextUtils.mockModeConfigAndContextManager(); + jobOption = new MigrationJobOption(); jobAPI = new MigrationJobAPI(); - jobConfigManager = new PipelineJobConfigurationManager(jobAPI); - jobManager = new PipelineJobManager(jobAPI); - transmissionJobManager = new TransmissionJobManager(jobAPI); - jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + jobConfigManager = new PipelineJobConfigurationManager(jobOption); + jobManager = new PipelineJobManager(jobOption); + transmissionJobManager = new TransmissionJobManager(jobOption); + jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); String jdbcUrl = "jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL"; databaseType = DatabaseTypeFactory.get(jdbcUrl); Map props = new HashMap<>(); props.put("jdbcUrl", jdbcUrl); props.put("username", "root"); props.put("password", "root"); - jobAPI.addMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonMap("ds_0", new DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props))); + jobOption.addMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonMap("ds_0", new DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props))); } @AfterAll static void afterClass() { - jobAPI.dropMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonList("ds_0")); + jobOption.dropMigrationSourceResources(PipelineContextUtils.getContextKey(), Collections.singletonList("ds_0")); } @Test @@ -188,8 +192,8 @@ void assertDataConsistencyCheck() { initTableData(jobConfig); Optional jobId = jobManager.start(jobConfig); assertTrue(jobId.isPresent()); - Map checkResultMap = jobAPI.buildDataConsistencyChecker( - jobConfig, jobAPI.buildProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", null); + Map checkResultMap = jobOption.buildDataConsistencyChecker( + jobConfig, jobOption.buildProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", null); assertThat(checkResultMap.size(), is(1)); String checkKey = "t_order"; assertTrue(checkResultMap.get(checkKey).isMatched()); @@ -253,20 +257,20 @@ void assertAddMigrationSourceResources() { void assertCreateJobConfigFailedOnMoreThanOneSourceTable() { List sourceTargetEntries = Stream.of("t_order_0", "t_order_1") .map(each -> new SourceTargetEntry("logic_db", new DataNode("ds_0", each), "t_order")).collect(Collectors.toList()); - assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(sourceTargetEntries, "logic_db"))); + assertThrows(PipelineInvalidParameterException.class, () -> jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(sourceTargetEntries, "logic_db"))); } @Test void assertCreateJobConfigFailedOnDataSourceNotExist() { List sourceTargetEntries = Collections.singletonList(new SourceTargetEntry("logic_db", new DataNode("ds_not_exists", "t_order"), "t_order")); - assertThrows(PipelineInvalidParameterException.class, () -> jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(sourceTargetEntries, "logic_db"))); + assertThrows(PipelineInvalidParameterException.class, () -> jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(sourceTargetEntries, "logic_db"))); } @Test void assertCreateJobConfig() throws SQLException { initIntPrimaryEnvironment(); SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order"); - String jobId = jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db")); + String jobId = jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db")); MigrationJobConfiguration actual = jobConfigManager.getJobConfiguration(jobId); assertThat(actual.getTargetDatabaseName(), is("logic_db")); List dataNodeLines = actual.getJobShardingDataNodes(); @@ -294,7 +298,7 @@ private void initIntPrimaryEnvironment() throws SQLException { @Test void assertShowMigrationSourceResources() { - Collection> actual = jobAPI.listMigrationSourceResources(PipelineContextUtils.getContextKey()); + Collection> actual = jobOption.listMigrationSourceResources(PipelineContextUtils.getContextKey()); assertThat(actual.size(), is(1)); Collection objects = actual.iterator().next(); assertThat(objects.toArray()[0], is("ds_0"));