diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java index 88ed08a492225..9f8039a30c1e3 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.common.job; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; /** * Pipeline job id. @@ -30,7 +30,7 @@ public interface PipelineJobId { * * @return type */ - JobType getJobType(); + PipelineJobType getJobType(); /** * Get format version. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java index cdd230ebf032d..d7fce319ffab6 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java @@ -33,10 +33,10 @@ @Slf4j public final class JobCodeRegistry { - private static final Map JOB_CODE_AND_TYPE_MAP = new HashMap<>(); + private static final Map JOB_CODE_AND_TYPE_MAP = new HashMap<>(); static { - for (JobType each : ShardingSphereServiceLoader.getServiceInstances(JobType.class)) { + for (PipelineJobType each : ShardingSphereServiceLoader.getServiceInstances(PipelineJobType.class)) { Preconditions.checkArgument(2 == each.getCode().length(), "Job type code length is not 2."); JOB_CODE_AND_TYPE_MAP.put(each.getCode(), each); } @@ -48,7 +48,7 @@ public final class JobCodeRegistry { * @param jobTypeCode job type code * @return job type */ - public static JobType getJobType(final String jobTypeCode) { + public static PipelineJobType getJobType(final String jobTypeCode) { Preconditions.checkArgument(JOB_CODE_AND_TYPE_MAP.containsKey(jobTypeCode), "Can not get job type by `%s`.", jobTypeCode); return JOB_CODE_AND_TYPE_MAP.get(jobTypeCode); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java similarity index 81% rename from kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java rename to kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java index bb599664c64fe..4781e9c455269 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java @@ -17,14 +17,15 @@ package org.apache.shardingsphere.data.pipeline.common.job.type; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; /** - * Job type. + * Pipeline job type. */ @SingletonSPI -public interface JobType extends TypedSPI { +public interface PipelineJobType extends TypedSPI { /** * Get job type code. @@ -33,6 +34,13 @@ public interface JobType extends TypedSPI { */ String getCode(); + /** + * Get job option. + * + * @return job option + */ + PipelineJobOption getOption(); + @Override String getType(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java index 53c1af6930568..b331b37854399 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.JobConfigurationChangedProcessor; import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier; @@ -89,7 +89,7 @@ protected void executeJob(final JobConfiguration jobConfig) { protected abstract AbstractPipelineJob buildPipelineJob(String jobId); - protected abstract JobType getJobType(); + protected abstract PipelineJobType getJobType(); @Override public String getType() { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java index 78eecd175d1b6..8cf5e7ea051d2 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier; @@ -65,7 +66,7 @@ public abstract class AbstractPipelineJob implements PipelineJob { protected AbstractPipelineJob(final String jobId) { this.jobId = jobId; - jobOption = TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobId).getType()); + jobOption = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getOption(); } /** diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java index 9a8834dd83188..17ffc1d82dbfe 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java @@ -22,7 +22,7 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; /** * Abstract pipeline job id. @@ -33,7 +33,7 @@ public abstract class AbstractPipelineJobId implements PipelineJobId { public static final String CURRENT_VERSION = "02"; - private final JobType jobType; + private final PipelineJobType jobType; private final PipelineContextKey contextKey; diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java index 5611ac02e1c8e..65a825e04972d 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java @@ -27,7 +27,7 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.util.InstanceTypeUtils; import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; @@ -64,7 +64,7 @@ public static String marshalJobIdCommonPrefix(final PipelineJobId pipelineJobId) * @param jobId job id * @return job type */ - public static JobType parseJobType(final String jobId) { + public static PipelineJobType parseJobType(final String jobId) { verifyJobId(jobId); return JobCodeRegistry.getJobType(jobId.substring(1, 3)); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java index 55f0c235f1d2a..ce65bf34c5792 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java @@ -24,7 +24,6 @@ import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration; import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; -import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; import java.util.Optional; @@ -33,7 +32,7 @@ * Pipeline job option. */ @SingletonSPI -public interface PipelineJobOption extends TypedSPI { +public interface PipelineJobOption { /** * Get YAML pipeline job configuration swapper. @@ -95,6 +94,10 @@ default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() { */ Class getJobClass(); - @Override + /** + * Get job type. + * + * @return job type + */ String getType(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java index 0f7bc16a222ce..26c95849c910f 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java @@ -21,9 +21,9 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -130,7 +130,7 @@ private static synchronized void persist(final String jobId, final int shardingI } persistContext.getHasNewEvents().set(false); long startTimeMillis = System.currentTimeMillis(); - new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobId).getType()) + new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getOption() .getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get()); persistContext.getBeforePersistingProgressMillis().set(null); if (6 == ThreadLocalRandom.current().nextInt(100)) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java index c02dc1355a894..a6c49fcb9659b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java @@ -58,8 +58,7 @@ public T getJobConfiguration(final String j public JobConfigurationPOJO convertToJobConfigurationPOJO(final PipelineJobConfiguration jobConfig) { JobConfigurationPOJO result = new JobConfigurationPOJO(); result.setJobName(jobConfig.getJobId()); - int shardingTotalCount = jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount(); - result.setShardingTotalCount(shardingTotalCount); + result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount()); result.setJobParameter(YamlEngine.marshal(jobOption.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig))); String createTimeFormat = LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()); result.getProps().setProperty("create_time", createTimeFormat); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index 717125ea11415..adc980be60bd2 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade; @@ -57,19 +58,17 @@ public final class PipelineJobManager { * Start job. * * @param jobConfig job configuration - * @return job id */ - public Optional start(final PipelineJobConfiguration jobConfig) { + public void start(final PipelineJobConfiguration jobConfig) { String jobId = jobConfig.getJobId(); ShardingSpherePreconditions.checkState(0 != jobConfig.getJobShardingCount(), () -> new PipelineJobCreationWithInvalidShardingCountException(jobId)); PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)); if (governanceFacade.getJobFacade().getConfiguration().isExisted(jobId)) { log.warn("jobId already exists in registry center, ignore, job id is `{}`", jobId); - return Optional.of(jobId); + return; } governanceFacade.getJobFacade().getJob().create(jobId, jobOption.getJobClass()); governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig)); - return Optional.of(jobId); } /** @@ -109,7 +108,7 @@ private void startCurrentDisabledJob(final String jobId) { private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) { PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> { try { - new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, toBeStartDisabledNextJobType)).startDisabledJob(optional); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStartDisabledNextJobType).getOption()).startDisabledJob(optional); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -131,7 +130,7 @@ public void stop(final String jobId) { private void stopPreviousJob(final String jobId, final String toBeStoppedPreviousJobType) { PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional -> { try { - new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, toBeStoppedPreviousJobType)).stop(optional); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, toBeStoppedPreviousJobType).getOption()).stop(optional); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java index 6291279a01902..655dbbb85d31b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java @@ -26,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; @@ -66,7 +67,7 @@ public TransmissionTasksRunner(final TransmissionJobItemContext jobItemContext) this.jobItemContext = jobItemContext; inventoryTasks = jobItemContext.getInventoryTasks(); incrementalTasks = jobItemContext.getIncrementalTasks(); - jobOption = TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()); + jobOption = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption(); jobManager = new PipelineJobManager(jobOption); jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); } @@ -89,7 +90,7 @@ public void start() { if (jobItemContext.isStopping()) { return; } - new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()) + new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption() .getYamlJobItemProgressSwapper()).persistProgress(jobItemContext); if (PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) { log.info("All inventory tasks finished."); diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java index a1976f940ab06..d4de294616fc4 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java @@ -17,16 +17,25 @@ package org.apache.shardingsphere.data.pipeline.common.job.type; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; + +import static org.mockito.Mockito.mock; + /** * Fixture job type. */ -public final class FixtureJobType implements JobType { +public final class FixtureJobType implements PipelineJobType { @Override public String getCode() { return "00"; } + @Override + public PipelineJobOption getOption() { + return mock(PipelineJobOption.class); + } + @Override public String getType() { return "FIXTURE"; diff --git a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType similarity index 100% rename from kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType rename to kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java index add405c928fc3..b385437e6ca62 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java @@ -20,8 +20,8 @@ import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement; import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; @@ -41,7 +41,7 @@ public final class ShowStreamingJobStatusExecutor implements QueryableRALExecuto @Override public Collection getRows(final ShowStreamingStatusStatement sqlStatement) { - TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, new CDCJobType().getType()); + TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, new CDCJobType().getType()).getOption(); List jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId()); long currentTimeMillis = System.currentTimeMillis(); return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList()); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java index 535299f0da221..843bb10a8e873 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java @@ -20,7 +20,7 @@ import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; @@ -40,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements QueryableRALExecutor getRows(final ShowStreamingRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, "STREAMING")) + PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "STREAMING").getOption()) .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); Collection result = new LinkedList<>(); result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java index bb146b4856a08..b6a597a82fb81 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java @@ -18,8 +18,8 @@ package org.apache.shardingsphere.migration.distsql.handler.query; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; @@ -40,7 +40,7 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto @Override public Collection getRows(final ShowMigrationStatusStatement sqlStatement) { - TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, "MIGRATION"); + TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption(); List jobItemInfos = new TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId()); long currentTimeMillis = System.currentTimeMillis(); return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java index 95cfcbdb538fb..c073f45c89607 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java @@ -17,18 +17,25 @@ package org.apache.shardingsphere.data.pipeline.cdc.api.job.type; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; /** * CDC job type. */ -public final class CDCJobType implements JobType { +public final class CDCJobType implements PipelineJobType { @Override public String getCode() { return "03"; } + @Override + public PipelineJobOption getOption() { + return new CDCJobOption(); + } + @Override public String getType() { return "STREAMING"; diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType similarity index 100% rename from kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType rename to kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption deleted file mode 100644 index 136bec61f328d..0000000000000 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption diff --git a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java index 70856a72c1c98..cb9a8b6eeb067 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java @@ -20,7 +20,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.junit.jupiter.api.Test; @@ -37,7 +37,7 @@ void assertParseJobType() { PipelineContextKey contextKey = new PipelineContextKey("sharding_db", InstanceType.PROXY); CDCJobId pipelineJobId = new CDCJobId(contextKey, Arrays.asList("test", "t_order"), false, CDCSinkType.SOCKET.name()); String jobId = PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd"; - JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId); + PipelineJobType actualJobType = PipelineJobIdUtils.parseJobType(jobId); assertThat(actualJobType, instanceOf(CDCJobType.class)); } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java index 87e71d934de21..ccc0dc3ad8d3a 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java @@ -17,18 +17,25 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption; /** * Consistency check job type. */ -public final class ConsistencyCheckJobType implements JobType { +public final class ConsistencyCheckJobType implements PipelineJobType { @Override public String getCode() { return "02"; } + @Override + public PipelineJobOption getOption() { + return new ConsistencyCheckJobOption(); + } + @Override public String getType() { return "CONSISTENCY_CHECK"; diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java index 89b4a40fc3a26..7d90fee414bb7 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob; @@ -41,7 +41,7 @@ protected AbstractPipelineJob buildPipelineJob(final String jobId) { } @Override - protected JobType getJobType() { + protected PipelineJobType getJobType() { return new ConsistencyCheckJobType(); } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index e5dc6c5ccc3ad..65fe73c5eae84 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -26,11 +26,10 @@ import org.apache.shardingsphere.data.pipeline.common.execute.PipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; @@ -85,7 +84,7 @@ public void start() { if (jobItemContext.isStopping()) { return; } - new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()) + new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption() .getYamlJobItemProgressSwapper()).persistProgress(jobItemContext); CompletableFuture future = jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor); ExecuteEngine.trigger(Collections.singletonList(future), new CheckExecuteCallback()); @@ -102,8 +101,8 @@ private final class CheckPipelineLifecycleRunnable extends AbstractPipelineLifec @Override protected void runBlocking() { jobItemManager.persistProgress(jobItemContext); - JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); - TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, jobType.getType()); + PipelineJobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); + TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption(); PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId); try { PipelineDataConsistencyChecker checker = jobOption.buildDataConsistencyChecker( diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType similarity index 100% rename from kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType rename to kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption deleted file mode 100644 index 31edf286e0753..0000000000000 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java index ab7c82ebd9052..5afb2f50f7aa2 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java @@ -17,18 +17,25 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; +import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption; /** * Migration job type. */ -public final class MigrationJobType implements JobType { +public final class MigrationJobType implements PipelineJobType { @Override public String getCode() { return "01"; } + @Override + public PipelineJobOption getOption() { + return new MigrationJobOption(); + } + @Override public String getType() { return "MIGRATION"; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 649e0c994fad4..b89e63824d366 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -21,6 +21,7 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; @@ -49,10 +50,11 @@ public final class MigrationJobAPI implements TransmissionJobAPI { public void commit(final String jobId) { log.info("Commit job {}", jobId); final long startTimeMillis = System.currentTimeMillis(); - PipelineJobManager jobManager = new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, getType())); + PipelineJobOption jobOption = new MigrationJobOption(); + PipelineJobManager jobManager = new PipelineJobManager(jobOption); jobManager.stop(jobId); dropCheckJobs(jobId); - MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobOption.class, getType())).getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId); refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName()); jobManager.drop(jobId); log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis); @@ -70,7 +72,7 @@ public void rollback(final String jobId) throws SQLException { final long startTimeMillis = System.currentTimeMillis(); dropCheckJobs(jobId); cleanTempTableOnRollback(jobId); - new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, getType())).drop(jobId); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, getType()).getOption()).drop(jobId); log.info("Rollback job {} cost {} ms", jobId, System.currentTimeMillis() - startTimeMillis); } @@ -81,7 +83,7 @@ private void dropCheckJobs(final String jobId) { } for (String each : checkJobIds) { try { - new PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class, getType())).drop(each); + new PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class, getType()).getOption()).drop(each); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON @@ -91,7 +93,7 @@ private void dropCheckJobs(final String jobId) { } private void cleanTempTableOnRollback(final String jobId) throws SQLException { - MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobOption.class, getType())).getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class, getType()).getOption()).getJobConfiguration(jobId); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); try ( diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java index a15921a9adb29..fd2c2dc48b6eb 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor; import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob; @@ -44,7 +44,7 @@ protected AbstractPipelineJob buildPipelineJob(final String jobId) { } @Override - protected JobType getJobType() { + protected PipelineJobType getJobType() { return new MigrationJobType(); } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType similarity index 100% rename from kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType rename to kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType diff --git a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption deleted file mode 100644 index c89d6bd11b8be..0000000000000 --- a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java index 1bf9b0046eddf..2e6cf74c7c127 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java @@ -19,7 +19,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; @@ -40,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements QueryableRALExecutor getRows(final ShowMigrationRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobOption.class, "MIGRATION")) + PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption()) .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); Collection result = new LinkedList<>(); result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java index 210da1a21d6fd..97c9b0dd3a245 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java @@ -19,7 +19,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption; +import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; @@ -35,7 +35,7 @@ public final class AlterTransmissionRuleUpdater implements RALUpdater jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); - JobConfigurationPOJO jobConfigPOJO = getJobConfigurationPOJO(jobId.get()); + PipelineJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); + JobConfigurationPOJO jobConfigPOJO = getJobConfigurationPOJO(jobConfig.getJobId()); assertFalse(jobConfigPOJO.isDisabled()); assertThat(jobConfigPOJO.getShardingTotalCount(), is(1)); } @@ -142,46 +143,43 @@ private JobConfigurationPOJO getJobConfigurationPOJO(final String jobId) { @Test void assertStartOrStopById() { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); - assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled()); + PipelineJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); + assertFalse(getJobConfigurationPOJO(jobConfig.getJobId()).isDisabled()); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); - jobManager.stop(jobId.get()); - assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled()); - jobManager.startDisabledJob(jobId.get()); - assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled()); + jobManager.stop(jobConfig.getJobId()); + assertTrue(getJobConfigurationPOJO(jobConfig.getJobId()).isDisabled()); + jobManager.startDisabledJob(jobConfig.getJobId()); + assertFalse(getJobConfigurationPOJO(jobConfig.getJobId()).isDisabled()); } @Test void assertRollback() throws SQLException { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId.get()); - initTableData(jobConfig); + PipelineJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); + initTableData(jobConfigManager.getJobConfiguration(jobConfig.getJobId())); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); - jobAPI.rollback(jobId.get()); - assertNull(getJobConfigurationPOJO(jobId.get())); + jobAPI.rollback(jobConfig.getJobId()); + assertNull(getJobConfigurationPOJO(jobConfig.getJobId())); } @Test void assertCommit() { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId.get()); - initTableData(jobConfig); + PipelineJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); + initTableData(jobConfigManager.getJobConfiguration(jobConfig.getJobId())); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); - jobAPI.commit(jobId.get()); - assertNull(getJobConfigurationPOJO(jobId.get())); + jobAPI.commit(jobConfig.getJobId()); + assertNull(getJobConfigurationPOJO(jobConfig.getJobId())); } @Test void assertGetProgress() { MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); - Optional jobId = jobManager.start(jobConfig); - assertTrue(jobId.isPresent()); + jobManager.start(jobConfig); Map jobProgressMap = transmissionJobManager.getJobProgress(jobConfig); assertThat(jobProgressMap.size(), is(1)); } @@ -190,10 +188,9 @@ void assertGetProgress() { void assertDataConsistencyCheck() { MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); initTableData(jobConfig); - Optional jobId = jobManager.start(jobConfig); - assertTrue(jobId.isPresent()); + jobManager.start(jobConfig); Map checkResultMap = jobOption.buildDataConsistencyChecker( - jobConfig, jobOption.buildProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", null); + jobConfig, jobOption.buildProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, "H2")).check("FIXTURE", null); assertThat(checkResultMap.size(), is(1)); String checkKey = "t_order"; assertTrue(checkResultMap.get(checkKey).isMatched()); @@ -202,12 +199,11 @@ void assertDataConsistencyCheck() { @Test void assertSwitchClusterConfigurationSucceed() { - final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); - Optional jobId = jobManager.start(jobConfig); - assertTrue(jobId.isPresent()); + MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); jobItemManager.persistProgress(jobItemContext); - jobItemManager.updateStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK); + jobItemManager.updateStatus(jobConfig.getJobId(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK); Map progress = transmissionJobManager.getJobProgress(jobConfig); for (Entry entry : progress.entrySet()) { assertThat(entry.getValue().getStatus(), is(JobStatus.EXECUTE_INVENTORY_TASK)); @@ -237,7 +233,7 @@ private void initTableData(final DataSource pipelineDataSource) throws SQLExcept @Test void assertRenewJobStatus() { - final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); jobItemManager.persistProgress(jobItemContext); jobItemManager.updateStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED); @@ -306,13 +302,13 @@ void assertShowMigrationSourceResources() { @Test void assertGetJobItemInfosAtBegin() { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); + MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); YamlTransmissionJobItemProgress yamlJobItemProgress = new YamlTransmissionJobItemProgress(); yamlJobItemProgress.setStatus(JobStatus.RUNNING.name()); yamlJobItemProgress.setSourceDatabaseType("MySQL"); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); - List jobItemInfos = transmissionJobManager.getJobItemInfos(jobId.get()); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, YamlEngine.marshal(yamlJobItemProgress)); + List jobItemInfos = transmissionJobManager.getJobItemInfos(jobConfig.getJobId()); assertThat(jobItemInfos.size(), is(1)); TransmissionJobItemInfo jobItemInfo = jobItemInfos.get(0); assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.RUNNING)); @@ -321,15 +317,15 @@ void assertGetJobItemInfosAtBegin() { @Test void assertGetJobItemInfosAtIncrementTask() { - Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); - assertTrue(jobId.isPresent()); + MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); + jobManager.start(jobConfig); YamlTransmissionJobItemProgress yamlJobItemProgress = new YamlTransmissionJobItemProgress(); yamlJobItemProgress.setSourceDatabaseType("MySQL"); yamlJobItemProgress.setStatus(JobStatus.EXECUTE_INCREMENTAL_TASK.name()); yamlJobItemProgress.setProcessedRecordsCount(100); yamlJobItemProgress.setInventoryRecordsCount(50); - PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); - List jobItemInfos = transmissionJobManager.getJobItemInfos(jobId.get()); + PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 0, YamlEngine.marshal(yamlJobItemProgress)); + List jobItemInfos = transmissionJobManager.getJobItemInfos(jobConfig.getJobId()); TransmissionJobItemInfo jobItemInfo = jobItemInfos.get(0); assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.EXECUTE_INCREMENTAL_TASK)); assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(100));