From acd872e114c7c3202d644b04ae28e0605c02e488 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sun, 3 Dec 2023 22:14:16 +0800 Subject: [PATCH] Merge PipelineJobType and PipelineJobOption (#29270) * Merge PipelineJobType and PipelineJobOption * Merge PipelineJobType and PipelineJobOption --- .../common/job/type/PipelineJobType.java | 91 ++++++++++++- .../core/job/AbstractPipelineJob.java | 5 +- .../core/job/AbstractSimplePipelineJob.java | 2 +- .../core/job/option/PipelineJobOption.java | 125 ------------------ .../PipelineJobProgressPersistService.java | 4 +- .../PipelineJobConfigurationManager.java | 10 +- .../core/job/service/PipelineJobManager.java | 23 ++-- .../job/service/TransmissionJobManager.java | 10 +- .../task/runner/TransmissionTasksRunner.java | 11 +- .../common/job/type/FixtureJobType.java | 39 +++++- .../query/ShowStreamingJobStatusExecutor.java | 4 +- .../query/ShowStreamingListExecutor.java | 4 +- .../ShowMigrationCheckStatusExecutor.java | 4 +- .../query/ShowMigrationJobStatusExecutor.java | 4 +- .../query/ShowMigrationListExecutor.java | 4 +- .../update/CheckMigrationJobUpdater.java | 13 +- .../update/DropMigrationCheckUpdater.java | 4 +- .../update/StartMigrationCheckUpdater.java | 4 +- .../handler/update/StartMigrationUpdater.java | 4 +- .../update/StopMigrationCheckUpdater.java | 4 +- .../handler/update/StopMigrationUpdater.java | 6 +- .../data/pipeline/cdc/CDCJob.java | 7 +- .../data/pipeline/cdc/CDCJobOption.java | 79 ----------- .../data/pipeline/cdc/CDCJobType.java | 46 ++++++- .../data/pipeline/cdc/api/CDCJobAPI.java | 16 +-- .../cdc/core/prepare/CDCJobPreparer.java | 6 +- .../cdc/handler/CDCBackendHandler.java | 8 +- .../consistencycheck/ConsistencyCheckJob.java | 2 +- .../ConsistencyCheckJobOption.java | 71 ---------- .../ConsistencyCheckJobType.java | 40 +++++- .../api/ConsistencyCheckJobAPI.java | 10 +- .../task/ConsistencyCheckTasksRunner.java | 16 +-- .../scenario/migration/MigrationJob.java | 6 +- .../migration/MigrationJobOption.java | 92 ------------- .../scenario/migration/MigrationJobType.java | 59 ++++++++- .../migration/api/MigrationJobAPI.java | 11 +- .../MigrationDataConsistencyChecker.java | 4 +- .../prepare/MigrationJobPreparer.java | 8 +- .../AlterTransmissionRuleUpdater.java | 2 +- .../api/impl/ConsistencyCheckJobAPITest.java | 10 +- .../api/impl/MigrationJobAPITest.java | 21 +-- 41 files changed, 376 insertions(+), 513 deletions(-) delete mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java delete mode 100644 kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java delete mode 100644 kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java delete mode 100644 kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java index 4781e9c455269..77689ff3f376c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java @@ -17,9 +17,21 @@ package org.apache.shardingsphere.data.pipeline.common.job.type; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; +import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; +import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; +import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; + +import java.util.Optional; /** * Pipeline job type. @@ -35,11 +47,82 @@ public interface PipelineJobType extends TypedSPI { String getCode(); /** - * Get job option. + * Get YAML pipeline job configuration swapper. + * + * @param type of YAML configuration + * @param type of pipeline job configuration + * @return YAML pipeline job configuration swapper + */ + YamlPipelineJobConfigurationSwapper getYamlJobConfigurationSwapper(); + + /** + * Get YAML pipeline job item progress swapper. + * + * @param type of pipeline job item progress + * @return YAML pipeline job item progress swapper + */ + YamlPipelineJobItemProgressSwapper getYamlJobItemProgressSwapper(); + + /** + * Get pipeline job class. + * + * @return pipeline job class + */ + Class getJobClass(); + + /** + * Whether to ignore to start disabled job when job item progress is finished. + * + * @return ignore to start disabled job when job item progress is finished or not + */ + default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() { + return false; + } + + /** + * Get to be start disabled next job type. + * + * @return to be start disabled next job type + */ + default Optional getToBeStartDisabledNextJobType() { + return Optional.empty(); + } + + /** + * Get to be stopped previous job type. + * + * @return to be stopped previous job type + */ + default Optional getToBeStoppedPreviousJobType() { + return Optional.empty(); + } + + /** + * Whether to force no sharding when convert to job configuration POJO. + * + * @return without sharding or not + */ + default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() { + return false; + } + + /** + * Get pipeline job info. + * + * @param jobId job ID + * @return pipeline job info + */ + PipelineJobInfo getJobInfo(String jobId); + + /** + * Build pipeline data consistency checker. * - * @return job option + * @param jobConfig job configuration + * @param processContext process context + * @param progressContext consistency check job item progress context + * @return all logic tables check result */ - PipelineJobOption getOption(); + PipelineDataConsistencyChecker buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, TransmissionProcessContext processContext, ConsistencyCheckJobItemProgressContext progressContext); @Override String getType(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java index 8cf5e7ea051d2..95b83c64372b1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java @@ -27,7 +27,6 @@ import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener; @@ -56,7 +55,7 @@ public abstract class AbstractPipelineJob implements PipelineJob { private final String jobId; @Getter(AccessLevel.PROTECTED) - private final PipelineJobOption jobOption; + private final PipelineJobType jobType; private final AtomicBoolean stopping = new AtomicBoolean(false); @@ -66,7 +65,7 @@ public abstract class AbstractPipelineJob implements PipelineJob { protected AbstractPipelineJob(final String jobId) { this.jobId = jobId; - jobOption = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getOption(); + jobType = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java index 41ed6e7f77e4d..92a5329850d85 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java @@ -61,7 +61,7 @@ public void execute(final ShardingContext shardingContext) { // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON - processFailed(new PipelineJobManager(getJobOption()), jobId, shardingItem, ex); + processFailed(new PipelineJobManager(getJobType()), jobId, shardingItem, ex); throw ex; } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java deleted file mode 100644 index 6b3d4b71e1961..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.core.job.option; - -import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; -import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; -import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper; -import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration; -import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; -import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; -import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; - -import java.util.Optional; - -/** - * Pipeline job option. - */ -@SingletonSPI -public interface PipelineJobOption { - - /** - * Get YAML pipeline job configuration swapper. - * - * @param type of YAML configuration - * @param type of pipeline job configuration - * @return YAML pipeline job configuration swapper - */ - YamlPipelineJobConfigurationSwapper getYamlJobConfigurationSwapper(); - - /** - * Get YAML pipeline job item progress swapper. - * - * @param type of pipeline job item progress - * @return YAML pipeline job item progress swapper - */ - YamlPipelineJobItemProgressSwapper getYamlJobItemProgressSwapper(); - - /** - * Get pipeline job class. - * - * @return pipeline job class - */ - Class getJobClass(); - - /** - * Whether to ignore to start disabled job when job item progress is finished. - * - * @return ignore to start disabled job when job item progress is finished or not - */ - default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() { - return false; - } - - /** - * Get to be start disabled next job type. - * - * @return to be start disabled next job type - */ - default Optional getToBeStartDisabledNextJobType() { - return Optional.empty(); - } - - /** - * Get to be stopped previous job type. - * - * @return to be stopped previous job type - */ - default Optional getToBeStoppedPreviousJobType() { - return Optional.empty(); - } - - /** - * Whether to force no sharding when convert to job configuration POJO. - * - * @return without sharding or not - */ - default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() { - return false; - } - - /** - * Get pipeline job info. - * - * @param jobId job ID - * @return pipeline job info - */ - PipelineJobInfo getJobInfo(String jobId); - - /** - * Build pipeline data consistency checker. - * - * @param jobConfig job configuration - * @param processContext process context - * @param progressContext consistency check job item progress context - * @return all logic tables check result - */ - PipelineDataConsistencyChecker buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, TransmissionProcessContext processContext, ConsistencyCheckJobItemProgressContext progressContext); - - /** - * Get job type. - * - * @return job type - */ - String getType(); -} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java index 26c95849c910f..45c7afc675545 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java @@ -130,8 +130,8 @@ private static synchronized void persist(final String jobId, final int shardingI } persistContext.getHasNewEvents().set(false); long startTimeMillis = System.currentTimeMillis(); - new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getOption() - .getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get()); + new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, + PipelineJobIdUtils.parseJobType(jobId).getType()).getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get()); persistContext.getBeforePersistingProgressMillis().set(null); if (6 == ThreadLocalRandom.current().nextInt(100)) { log.info("persist, jobId={}, shardingItem={}, cost {} ms", jobId, shardingItem, System.currentTimeMillis() - startTimeMillis); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java index a6c49fcb9659b..5d4b88783082c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java @@ -19,9 +19,9 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; @@ -35,7 +35,7 @@ @RequiredArgsConstructor public final class PipelineJobConfigurationManager { - private final PipelineJobOption jobOption; + private final PipelineJobType jobType; /** * Get job configuration. @@ -46,7 +46,7 @@ public final class PipelineJobConfigurationManager { */ @SuppressWarnings("unchecked") public T getJobConfiguration(final String jobId) { - return (T) jobOption.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter()); + return (T) jobType.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter()); } /** @@ -58,8 +58,8 @@ public T getJobConfiguration(final String j public JobConfigurationPOJO convertToJobConfigurationPOJO(final PipelineJobConfiguration jobConfig) { JobConfigurationPOJO result = new JobConfigurationPOJO(); result.setJobName(jobConfig.getJobId()); - result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount()); - result.setJobParameter(YamlEngine.marshal(jobOption.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig))); + result.setShardingTotalCount(jobType.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount()); + result.setJobParameter(YamlEngine.marshal(jobType.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig))); String createTimeFormat = LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()); result.getProps().setProperty("create_time", createTimeFormat); result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index a00564b5bbc47..2d98b49be6a1e 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -32,7 +32,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -52,7 +51,7 @@ @Slf4j public final class PipelineJobManager { - private final PipelineJobOption jobOption; + private final PipelineJobType jobType; /** * Start job. @@ -67,8 +66,8 @@ public void start(final PipelineJobConfiguration jobConfig) { log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId); return; } - governanceFacade.getJobFacade().getJob().create(jobId, jobOption.getJobClass()); - governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig)); + governanceFacade.getJobFacade().getJob().create(jobId, jobType.getJobClass()); + governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobType).convertToJobConfigurationPOJO(jobConfig)); } /** @@ -77,15 +76,15 @@ public void start(final PipelineJobConfiguration jobConfig) { * @param jobId job id */ public void resume(final String jobId) { - if (jobOption.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) { - Optional jobItemProgress = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()).getProgress(jobId, 0); + if (jobType.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) { + Optional jobItemProgress = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).getProgress(jobId, 0); if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) { log.info("job status is FINISHED, ignore, jobId={}", jobId); return; } } startCurrentDisabledJob(jobId); - jobOption.getToBeStartDisabledNextJobType().ifPresent(optional -> startNextDisabledJob(jobId, optional)); + jobType.getToBeStartDisabledNextJobType().ifPresent(optional -> startNextDisabledJob(jobId, optional)); } @@ -108,7 +107,7 @@ private void startCurrentDisabledJob(final String jobId) { private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) { PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional -> { try { - new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStartDisabledNextJobType).getOption()).resume(optional); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStartDisabledNextJobType)).resume(optional); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -123,14 +122,14 @@ private void startNextDisabledJob(final String jobId, final String toBeStartDisa * @param jobId job id */ public void stop(final String jobId) { - jobOption.getToBeStoppedPreviousJobType().ifPresent(optional -> stopPreviousJob(jobId, optional)); + jobType.getToBeStoppedPreviousJobType().ifPresent(optional -> stopPreviousJob(jobId, optional)); stopCurrentJob(jobId); } private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) { PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().findLatestCheckJobId(jobId).ifPresent(optional -> { try { - new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStoppedPreviousJobType).getOption()).stop(optional); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStoppedPreviousJobType)).stop(optional); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -175,8 +174,8 @@ public void drop(final String jobId) { public List getJobInfos(final PipelineContextKey contextKey) { try { return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream() - .filter(each -> !each.getJobName().startsWith("_") && jobOption.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType())) - .map(each -> jobOption.getJobInfo(each.getJobName())).collect(Collectors.toList()); + .filter(each -> !each.getJobName().startsWith("_") && jobType.getType().equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType())) + .map(each -> jobType.getJobInfo(each.getJobName())).collect(Collectors.toList()); } catch (final UnsupportedOperationException ex) { return Collections.emptyList(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java index 842a84f2a6e14..0be041dafd0b1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java @@ -21,11 +21,11 @@ import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import java.util.Collection; @@ -43,7 +43,7 @@ @RequiredArgsConstructor public final class TransmissionJobManager { - private final PipelineJobOption jobOption; + private final PipelineJobType jobType; /** * Get job infos. @@ -52,11 +52,11 @@ public final class TransmissionJobManager { * @return job item infos */ public Collection getJobItemInfos(final String jobId) { - PipelineJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId); + PipelineJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId); long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0")); Map jobProgress = getJobProgress(jobConfig); List result = new LinkedList<>(); - PipelineJobInfo jobInfo = jobOption.getJobInfo(jobId); + PipelineJobInfo jobInfo = jobType.getJobInfo(jobId); for (Entry entry : jobProgress.entrySet()) { int shardingItem = entry.getKey(); TransmissionJobItemProgress jobItemProgress = entry.getValue(); @@ -88,7 +88,7 @@ private static int getInventoryFinishedPercentage(final TransmissionJobItemProgr * @return each sharding item progress */ public Map getJobProgress(final PipelineJobConfiguration jobConfig) { - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); + PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); String jobId = jobConfig.getJobId(); JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java index bd4fca50425aa..30b6dd5351ce3 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java @@ -29,7 +29,6 @@ import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; @@ -57,7 +56,7 @@ public class TransmissionTasksRunner implements PipelineTasksRunner { private final Collection incrementalTasks; - private final PipelineJobOption jobOption; + private final PipelineJobType jobType; private final PipelineJobManager jobManager; @@ -67,9 +66,9 @@ public TransmissionTasksRunner(final TransmissionJobItemContext jobItemContext) this.jobItemContext = jobItemContext; inventoryTasks = jobItemContext.getInventoryTasks(); incrementalTasks = jobItemContext.getIncrementalTasks(); - jobOption = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption(); - jobManager = new PipelineJobManager(jobOption); - jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); + jobType = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()); + jobManager = new PipelineJobManager(jobType); + jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); } @Override @@ -90,7 +89,7 @@ public void start() { if (jobItemContext.isStopping()) { return; } - new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption() + new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()) .getYamlJobItemProgressSwapper()).persistProgress(jobItemContext); if (PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) { log.info("All inventory tasks finished."); diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java index d4de294616fc4..f2d143a71f6ce 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java @@ -17,9 +17,17 @@ package org.apache.shardingsphere.data.pipeline.common.job.type; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; - -import static org.mockito.Mockito.mock; +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; +import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; +import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; +import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; /** * Fixture job type. @@ -32,8 +40,29 @@ public String getCode() { } @Override - public PipelineJobOption getOption() { - return mock(PipelineJobOption.class); + public YamlPipelineJobConfigurationSwapper getYamlJobConfigurationSwapper() { + return null; + } + + @Override + public YamlPipelineJobItemProgressSwapper getYamlJobItemProgressSwapper() { + return null; + } + + @Override + public Class getJobClass() { + return null; + } + + @Override + public PipelineJobInfo getJobInfo(final String jobId) { + return null; + } + + @Override + public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, + final TransmissionProcessContext processContext, final ConsistencyCheckJobItemProgressContext progressContext) { + return null; } @Override diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java index 7bb2e0f434824..223d8c5a784db 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.distsql.handler.query; import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement; -import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption; +import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; @@ -37,7 +37,7 @@ public final class ShowStreamingJobStatusExecutor implements QueryableRALExecuto @Override public Collection getRows(final ShowStreamingStatusStatement sqlStatement) { - Collection jobItemInfos = new TransmissionJobManager(new CDCJobOption()).getJobItemInfos(sqlStatement.getJobId()); + Collection jobItemInfos = new TransmissionJobManager(new CDCJobType()).getJobItemInfos(sqlStatement.getJobId()); long currentTimeMillis = System.currentTimeMillis(); return jobItemInfos.stream().map(each -> getRow(each, currentTimeMillis)).collect(Collectors.toList()); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java index 63c52219df3ff..c69d02d46783e 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingListExecutor.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.distsql.handler.query; import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement; -import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption; +import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; @@ -35,7 +35,7 @@ */ public final class ShowStreamingListExecutor implements QueryableRALExecutor { - private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new CDCJobOption()); + private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new CDCJobType()); @Override public Collection getRows(final ShowStreamingListStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java index 4ad92f6161e85..b63898a803316 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationCheckStatusExecutor.java @@ -18,8 +18,8 @@ package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.query; import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckStatusStatement; @@ -35,7 +35,7 @@ */ public final class ShowMigrationCheckStatusExecutor implements QueryableRALExecutor { - private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobOption()); + private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobType()); @Override public Collection getRows(final ShowMigrationCheckStatusStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java index 2c518b1e648a6..38f2bcdf217ed 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java @@ -20,7 +20,7 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; import org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement; @@ -37,7 +37,7 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto @Override public Collection getRows(final ShowMigrationStatusStatement sqlStatement) { - Collection jobItemInfos = new TransmissionJobManager(new MigrationJobOption()).getJobItemInfos(sqlStatement.getJobId()); + Collection jobItemInfos = new TransmissionJobManager(new MigrationJobType()).getJobItemInfos(sqlStatement.getJobId()); long currentTimeMillis = System.currentTimeMillis(); return jobItemInfos.stream().map(each -> getRow(each, currentTimeMillis)).collect(Collectors.toList()); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java index 156dae257b016..a3e9f8cffffea 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/query/ShowMigrationListExecutor.java @@ -19,7 +19,7 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; @@ -34,7 +34,7 @@ */ public final class ShowMigrationListExecutor implements QueryableRALExecutor { - private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new MigrationJobOption()); + private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new MigrationJobType()); @Override public Collection getRows(final ShowMigrationListStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java index 5b669b4ba18ff..cc0b69e236f36 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/CheckMigrationJobUpdater.java @@ -17,14 +17,15 @@ package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.CreateConsistencyCheckJobParameter; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.distsql.segment.AlgorithmSegment; @@ -39,9 +40,9 @@ */ public final class CheckMigrationJobUpdater implements RALUpdater { - private final ConsistencyCheckJobAPI checkJobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobOption()); + private final ConsistencyCheckJobAPI checkJobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobType()); - private final MigrationJobOption migrationJobOption = new MigrationJobOption(); + private final PipelineJobType migrationJobType = new MigrationJobType(); @Override public void executeUpdate(final String databaseName, final CheckMigrationStatement sqlStatement) throws SQLException { @@ -49,13 +50,13 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName(); Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps(); String jobId = sqlStatement.getJobId(); - MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(migrationJobOption).getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(migrationJobType).getJobConfiguration(jobId); verifyInventoryFinished(jobConfig); checkJobAPI.start(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType())); } private void verifyInventoryFinished(final MigrationJobConfiguration jobConfig) { - TransmissionJobManager transmissionJobManager = new TransmissionJobManager(migrationJobOption); + TransmissionJobManager transmissionJobManager = new TransmissionJobManager(migrationJobType); ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(), transmissionJobManager.getJobProgress(jobConfig).values()), () -> new PipelineInvalidParameterException("Inventory is not finished.")); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java index 8b820291d38ae..da0690d1538cf 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/DropMigrationCheckUpdater.java @@ -17,8 +17,8 @@ package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.migration.distsql.statement.DropMigrationCheckStatement; @@ -27,7 +27,7 @@ */ public final class DropMigrationCheckUpdater implements RALUpdater { - private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobOption()); + private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobType()); @Override public void executeUpdate(final String databaseName, final DropMigrationCheckStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java index b34f4274b332d..189eec1223764 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationCheckUpdater.java @@ -17,8 +17,8 @@ package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.migration.distsql.statement.StartMigrationCheckStatement; @@ -27,7 +27,7 @@ */ public final class StartMigrationCheckUpdater implements RALUpdater { - private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobOption()); + private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobType()); @Override public void executeUpdate(final String databaseName, final StartMigrationCheckStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java index 209b62587f333..816397e0c5229 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StartMigrationUpdater.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement; @@ -27,7 +27,7 @@ */ public final class StartMigrationUpdater implements RALUpdater { - private final PipelineJobManager jobManager = new PipelineJobManager(new MigrationJobOption()); + private final PipelineJobManager jobManager = new PipelineJobManager(new MigrationJobType()); @Override public void executeUpdate(final String databaseName, final StartMigrationStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java index c809ecb27430d..e305952408273 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationCheckUpdater.java @@ -17,8 +17,8 @@ package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.ConsistencyCheckJobAPI; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.migration.distsql.statement.StopMigrationCheckStatement; @@ -27,7 +27,7 @@ */ public final class StopMigrationCheckUpdater implements RALUpdater { - private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobOption()); + private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI(new ConsistencyCheckJobType()); @Override public void executeUpdate(final String databaseName, final StopMigrationCheckStatement sqlStatement) { diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java index fa11b738eb4eb..8dea2425a2c2e 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/migration/distsql/handler/update/StopMigrationUpdater.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.migration.distsql.handler.update; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement; @@ -27,9 +27,7 @@ */ public final class StopMigrationUpdater implements RALUpdater { - private final MigrationJobOption jobOption = new MigrationJobOption(); - - private final PipelineJobManager jobManager = new PipelineJobManager(jobOption); + private final PipelineJobManager jobManager = new PipelineJobManager(new MigrationJobType()); @Override public void executeUpdate(final String databaseName, final StopMigrationStatement sqlStatement) { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java index 5da8e693eaa74..3cf6df2f069ab 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java @@ -46,6 +46,7 @@ import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; @@ -85,11 +86,11 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob { @Getter private final PipelineSink sink; - private final CDCJobOption jobOption = new CDCJobOption(); + private final PipelineJobType jobType = TypedSPILoader.getService(PipelineJobType.class, "STREAMING"); private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING"); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); @@ -134,7 +135,7 @@ public void execute(final ShardingContext shardingContext) { private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) { Optional initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem); PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( - processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobOption.getType())); + processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobType.getType())); TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java deleted file mode 100644 index d2e6854e7554b..0000000000000 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.cdc; - -import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper; -import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; - -/** - * CDC job option. - */ -@Slf4j -public final class CDCJobOption implements PipelineJobOption { - - @SuppressWarnings("unchecked") - @Override - public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() { - return new YamlCDCJobConfigurationSwapper(); - } - - @SuppressWarnings("unchecked") - @Override - public YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { - return new YamlTransmissionJobItemProgressSwapper(); - } - - @Override - public Class getJobClass() { - return CDCJob.class; - } - - @Override - public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() { - return true; - } - - @Override - public PipelineJobInfo getJobInfo(final String jobId) { - PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); - CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId); - return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames())); - } - - @Override - public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext, - final ConsistencyCheckJobItemProgressContext progressContext) { - throw new UnsupportedOperationException(); - } - - @Override - public String getType() { - return "STREAMING"; - } -} diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java index 654f878dca231..509f24e6c2007 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobType.java @@ -17,8 +17,18 @@ package org.apache.shardingsphere.data.pipeline.cdc; +import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper; import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; /** * CDC job type. @@ -30,9 +40,39 @@ public String getCode() { return "03"; } + @SuppressWarnings("unchecked") @Override - public PipelineJobOption getOption() { - return new CDCJobOption(); + public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() { + return new YamlCDCJobConfigurationSwapper(); + } + + @SuppressWarnings("unchecked") + @Override + public YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { + return new YamlTransmissionJobItemProgressSwapper(); + } + + @Override + public Class getJobClass() { + return CDCJob.class; + } + + @Override + public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() { + return true; + } + + @Override + public PipelineJobInfo getJobInfo(final String jobId) { + PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(new CDCJobType()).getJobConfiguration(jobId); + return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames())); + } + + @Override + public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext, + final ConsistencyCheckJobItemProgressContext progressContext) { + throw new UnsupportedOperationException(); } @Override diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index 71c58442b9bfd..d24cbcc16122a 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -22,7 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.CDCJob; import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId; -import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption; +import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration.YamlSinkConfiguration; @@ -83,7 +83,7 @@ @Slf4j public final class CDCJobAPI implements TransmissionJobAPI { - private final CDCJobOption jobOption; + private final CDCJobType jobType; private final PipelineJobManager jobManager; @@ -96,9 +96,9 @@ public final class CDCJobAPI implements TransmissionJobAPI { private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper; public CDCJobAPI() { - jobOption = new CDCJobOption(); - jobManager = new PipelineJobManager(jobOption); - jobConfigManager = new PipelineJobConfigurationManager(jobOption); + jobType = new CDCJobType(); + jobManager = new PipelineJobManager(jobType); + jobConfigManager = new PipelineJobConfigurationManager(jobType); dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine(); pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper(); @@ -121,7 +121,7 @@ public String create(final StreamDataParameter param, final CDCSinkType sinkType if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobConfig.getJobId())) { log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId()); } else { - governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), jobOption.getJobClass()); + governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), jobType.getJobClass()); JobConfigurationPOJO jobConfigPOJO = jobConfigManager.convertToJobConfigurationPOJO(jobConfig); jobConfigPOJO.setDisabled(true); governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(), jobConfigPOJO); @@ -169,7 +169,7 @@ private ShardingSpherePipelineDataSourceConfiguration getDataSourceConfiguration private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { String jobId = jobConfig.getJobId(); - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); + PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); try (PipelineDataSourceManager pipelineDataSourceManager = new DefaultPipelineDataSourceManager()) { for (int i = 0; i < jobConfig.getJobShardingCount(); i++) { if (jobItemManager.getProgress(jobId, i).isPresent()) { @@ -178,7 +178,7 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) { IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames())); TransmissionJobItemProgress jobItemProgress = getTransmissionJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext); PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getProcess().persist( - jobId, i, YamlEngine.marshal(jobOption.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); + jobId, i, YamlEngine.marshal(jobType.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress))); } } catch (final SQLException ex) { throw new PrepareJobWithGetBinlogPositionException(jobId, ex); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java index 4cb7e57ea51ca..a18836b07932e 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.prepare; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption; +import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext; import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair; @@ -67,9 +67,7 @@ @Slf4j public final class CDCJobPreparer { - private final CDCJobOption jobAPI = new CDCJobOption(); - - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(new CDCJobType().getYamlJobItemProgressSwapper()); /** * Do prepare work. diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index 04ab6e3cb5277..6270c145d80ef 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -21,8 +21,9 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelId; import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.cdc.CDCJob; +import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI; -import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption; import org.apache.shardingsphere.data.pipeline.cdc.api.StreamDataParameter; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; @@ -31,7 +32,6 @@ import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter; import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager; import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink; -import org.apache.shardingsphere.data.pipeline.cdc.CDCJob; import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper; import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException; import org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException; @@ -48,8 +48,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType; @@ -75,7 +75,7 @@ public final class CDCBackendHandler { private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING"); - private final PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(new CDCJobOption()); + private final PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(new CDCJobType()); /** * Get database name by job ID. diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java index f3056e72746a1..3382e32690061 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java @@ -45,7 +45,7 @@ public ConsistencyCheckJob(final String jobId) { @Override public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final ShardingContext shardingContext) { ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getJobOption().getYamlJobItemProgressSwapper()); + PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper()); Optional jobItemProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingContext.getShardingItem()); return new ConsistencyCheckJobItemContext(jobConfig, shardingContext.getShardingItem(), JobStatus.RUNNING, jobItemProgress.orElse(null)); } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java deleted file mode 100644 index d5a1caaf312c5..0000000000000 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobOption.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck; - -import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper; - -/** - * Consistency check job option. - */ -public final class ConsistencyCheckJobOption implements PipelineJobOption { - - @SuppressWarnings("unchecked") - @Override - public YamlConsistencyCheckJobConfigurationSwapper getYamlJobConfigurationSwapper() { - return new YamlConsistencyCheckJobConfigurationSwapper(); - } - - @SuppressWarnings("unchecked") - @Override - public YamlConsistencyCheckJobItemProgressSwapper getYamlJobItemProgressSwapper() { - return new YamlConsistencyCheckJobItemProgressSwapper(); - } - - @Override - public Class getJobClass() { - return ConsistencyCheckJob.class; - } - - @Override - public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() { - return true; - } - - @Override - public PipelineJobInfo getJobInfo(final String jobId) { - throw new UnsupportedOperationException(); - } - - @Override - public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, - final TransmissionProcessContext processContext, final ConsistencyCheckJobItemProgressContext progressContext) { - return null; - } - - @Override - public String getType() { - return "CONSISTENCY_CHECK"; - } -} diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java index 10343fb846cf5..2d096a2d0d6c6 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java @@ -17,8 +17,14 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck; +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper; import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper; /** * Consistency check job type. @@ -30,9 +36,37 @@ public String getCode() { return "02"; } + @SuppressWarnings("unchecked") @Override - public PipelineJobOption getOption() { - return new ConsistencyCheckJobOption(); + public YamlConsistencyCheckJobConfigurationSwapper getYamlJobConfigurationSwapper() { + return new YamlConsistencyCheckJobConfigurationSwapper(); + } + + @SuppressWarnings("unchecked") + @Override + public YamlConsistencyCheckJobItemProgressSwapper getYamlJobItemProgressSwapper() { + return new YamlConsistencyCheckJobItemProgressSwapper(); + } + + @Override + public Class getJobClass() { + return ConsistencyCheckJob.class; + } + + @Override + public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() { + return true; + } + + @Override + public PipelineJobInfo getJobInfo(final String jobId) { + throw new UnsupportedOperationException(); + } + + @Override + public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, + final TransmissionProcessContext processContext, final ConsistencyCheckJobItemProgressContext progressContext) { + return null; } @Override diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java index 4c1018a7ff1e5..e4b7f31ebea5d 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/ConsistencyCheckJobAPI.java @@ -35,7 +35,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper; @@ -72,10 +72,10 @@ public final class ConsistencyCheckJobAPI { private final PipelineJobItemManager jobItemManager; - public ConsistencyCheckJobAPI(final ConsistencyCheckJobOption jobOption) { - progressSwapper = jobOption.getYamlJobItemProgressSwapper(); - jobManager = new PipelineJobManager(jobOption); - jobConfigManager = new PipelineJobConfigurationManager(jobOption); + public ConsistencyCheckJobAPI(final ConsistencyCheckJobType jobType) { + progressSwapper = jobType.getYamlJobItemProgressSwapper(); + jobManager = new PipelineJobManager(jobType); + jobConfigManager = new PipelineJobConfigurationManager(jobType); jobItemManager = new PipelineJobItemManager<>(progressSwapper); } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 35accd7c08d24..0493794402143 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -34,13 +34,12 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -56,11 +55,11 @@ @Slf4j public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner { - private final ConsistencyCheckJobOption jobAPI = new ConsistencyCheckJobOption(); + private final PipelineJobType jobType = new ConsistencyCheckJobType(); - private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI); + private final PipelineJobManager jobManager = new PipelineJobManager(jobType); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); @@ -90,7 +89,7 @@ public void start() { if (jobItemContext.isStopping()) { return; } - new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption() + new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()) .getYamlJobItemProgressSwapper()).persistProgress(jobItemContext); CompletableFuture future = jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor); ExecuteEngine.trigger(Collections.singletonList(future), new CheckExecuteCallback()); @@ -108,12 +107,11 @@ private final class CheckPipelineLifecycleRunnable extends AbstractPipelineLifec protected void runBlocking() { jobItemManager.persistProgress(jobItemContext); PipelineJobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); - PipelineJobOption jobOption = TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption(); - PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId); + PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobType).getJobConfiguration(parentJobId); try { PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()), jobType.getType())); - PipelineDataConsistencyChecker checker = jobOption.buildDataConsistencyChecker( + PipelineDataConsistencyChecker checker = jobType.buildDataConsistencyChecker( parentJobConfig, new TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), jobItemContext.getProgressContext()); consistencyChecker.set(checker); Map checkResultMap = checker.check(checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps()); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index c405924b26612..320ebb5db5846 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -68,9 +68,7 @@ @Slf4j public final class MigrationJob extends AbstractSimplePipelineJob { - private final MigrationJobOption jobOption = new MigrationJobOption(); - - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(new MigrationJobType().getYamlJobItemProgressSwapper()); private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); @@ -89,7 +87,7 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); Optional initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem); PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( - processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobOption.getType())); + processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), "MIGRATION")); TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java deleted file mode 100644 index 31e36e2879be8..0000000000000 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.scenario.migration; - -import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; -import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; -import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; -import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.Optional; - -/** - * Migration job option. - */ -@Slf4j -public final class MigrationJobOption implements PipelineJobOption { - - @SuppressWarnings("unchecked") - @Override - public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() { - return new YamlMigrationJobConfigurationSwapper(); - } - - @SuppressWarnings("unchecked") - @Override - public YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { - return new YamlTransmissionJobItemProgressSwapper(); - } - - @Override - public Class getJobClass() { - return MigrationJob.class; - } - - @Override - public Optional getToBeStartDisabledNextJobType() { - return Optional.of("CONSISTENCY_CHECK"); - } - - @Override - public Optional getToBeStoppedPreviousJobType() { - return Optional.of("CONSISTENCY_CHECK"); - } - - @Override - public PipelineJobInfo getJobInfo(final String jobId) { - PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); - Collection sourceTables = new LinkedList<>(); - new PipelineJobConfigurationManager(this).getJobConfiguration(jobId).getJobShardingDataNodes() - .forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); - return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables)); - } - - @Override - public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext, - final ConsistencyCheckJobItemProgressContext progressContext) { - return new MigrationDataConsistencyChecker((MigrationJobConfiguration) jobConfig, processContext, progressContext); - } - - @Override - public String getType() { - return "MIGRATION"; - } -} diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java index c3a26100194bd..fc84d389aef9a 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java @@ -17,8 +17,24 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration; +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; +import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper; import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; +import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; +import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.Optional; /** * Migration job type. @@ -30,9 +46,46 @@ public String getCode() { return "01"; } + @SuppressWarnings("unchecked") + @Override + public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() { + return new YamlMigrationJobConfigurationSwapper(); + } + + @SuppressWarnings("unchecked") + @Override + public YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { + return new YamlTransmissionJobItemProgressSwapper(); + } + + @Override + public Class getJobClass() { + return MigrationJob.class; + } + + @Override + public Optional getToBeStartDisabledNextJobType() { + return Optional.of("CONSISTENCY_CHECK"); + } + + @Override + public Optional getToBeStoppedPreviousJobType() { + return Optional.of("CONSISTENCY_CHECK"); + } + + @Override + public PipelineJobInfo getJobInfo(final String jobId) { + PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + Collection sourceTables = new LinkedList<>(); + new PipelineJobConfigurationManager(new MigrationJobType()).getJobConfiguration(jobId).getJobShardingDataNodes() + .forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); + return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables)); + } + @Override - public PipelineJobOption getOption() { - return new MigrationJobOption(); + public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext, + final ConsistencyCheckJobItemProgressContext progressContext) { + return new MigrationDataConsistencyChecker((MigrationJobConfiguration) jobConfig, processContext, progressContext); } @Override diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java index 0da06c43777c1..45e5071185bde 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java @@ -41,12 +41,11 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper; @@ -100,9 +99,9 @@ public final class MigrationJobAPI implements TransmissionJobAPI { private final PipelineDataSourcePersistService dataSourcePersistService; public MigrationJobAPI() { - PipelineJobOption jobOption = new MigrationJobOption(); - jobManager = new PipelineJobManager(jobOption); - jobConfigManager = new PipelineJobConfigurationManager(jobOption); + PipelineJobType jobType = new MigrationJobType(); + jobManager = new PipelineJobManager(jobType); + jobConfigManager = new PipelineJobConfigurationManager(jobType); dataSourcePersistService = new PipelineDataSourcePersistService(); } @@ -319,7 +318,7 @@ private void dropCheckJobs(final String jobId) { } private void cleanTempTableOnRollback(final String jobId) throws SQLException { - MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class, getType()).getOption()).getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class, getType())).getJobConfiguration(jobId); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); try ( diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java index 125c843304217..c4dcd35b7fb30 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java @@ -43,7 +43,7 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker; import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.infra.datanode.DataNode; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; @@ -99,7 +99,7 @@ public Map check(final String algorithm } private long getRecordsCount() { - Map jobProgress = new TransmissionJobManager(new MigrationJobOption()).getJobProgress(jobConfig); + Map jobProgress = new TransmissionJobManager(new MigrationJobType()).getJobProgress(jobConfig); return jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getProcessedRecordsCount).sum(); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index 9d64551061c21..4afd70afd908c 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -28,9 +28,9 @@ import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine; import org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressListener; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader; import org.apache.shardingsphere.data.pipeline.common.spi.ingest.dumper.IncrementalDumperCreator; @@ -54,7 +54,7 @@ import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; @@ -81,9 +81,9 @@ @Slf4j public final class MigrationJobPreparer { - private final MigrationJobOption jobOption = new MigrationJobOption(); + private final MigrationJobType jobType = new MigrationJobType(); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); /** * Do prepare work. diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java index fff58c7361c6a..cdc5c106c2086 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java @@ -37,7 +37,7 @@ public final class AlterTransmissionRuleUpdater implements RALUpdater jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); + private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new YamlMigrationJobConfigurationSwapper(); @@ -70,7 +70,7 @@ void assertCreateJobConfig() { String parentJobId = parentJobConfig.getJobId(); String checkJobId = jobAPI.start(new CreateConsistencyCheckJobParameter(parentJobId, null, null, parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); - ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(checkJobId); + ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobConfigurationManager(jobType).getJobConfiguration(checkJobId); int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE; String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal(); assertThat(checkJobConfig.getJobId(), is(expectCheckJobId)); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 46f0812cdc16a..a06584cdf043e 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -30,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; @@ -37,15 +38,15 @@ import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; +import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; +import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext; import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator; @@ -96,7 +97,7 @@ @StaticMockSettings(PipelineDistributedBarrier.class) class MigrationJobAPITest { - private static MigrationJobOption jobOption; + private static PipelineJobType jobType; private static MigrationJobAPI jobAPI; @@ -113,12 +114,12 @@ class MigrationJobAPITest { @BeforeAll static void beforeClass() { PipelineContextUtils.mockModeConfigAndContextManager(); - jobOption = new MigrationJobOption(); + jobType = new MigrationJobType(); jobAPI = (MigrationJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "MIGRATION"); - jobConfigManager = new PipelineJobConfigurationManager(jobOption); - jobManager = new PipelineJobManager(jobOption); - transmissionJobManager = new TransmissionJobManager(jobOption); - jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); + jobConfigManager = new PipelineJobConfigurationManager(jobType); + jobManager = new PipelineJobManager(jobType); + transmissionJobManager = new TransmissionJobManager(jobType); + jobItemManager = new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()); String jdbcUrl = "jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL"; databaseType = DatabaseTypeFactory.get(jdbcUrl); Map props = new HashMap<>(); @@ -195,9 +196,9 @@ void assertDataConsistencyCheck() { initTableData(jobConfig); jobManager.start(jobConfig); PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( - new PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobOption.getType())); + new PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobType.getType())); TransmissionProcessContext processContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); - Map checkResultMap = jobOption.buildDataConsistencyChecker( + Map checkResultMap = jobType.buildDataConsistencyChecker( jobConfig, processContext, new ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, "H2")).check("FIXTURE", null); assertThat(checkResultMap.size(), is(1)); String checkKey = "t_order";