diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java index 98f021402428f..1422aed41dd26 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessEngine.java @@ -21,6 +21,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; +import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier; @@ -40,8 +41,11 @@ public final class JobConfigurationChangedProcessEngine { * @param eventType event type * @param jobConfig pipeline job configuration * @param processor pipeline job configuration changed processor + * @param type of pipeline job configuration */ - public void process(final Type eventType, final JobConfiguration jobConfig, final JobConfigurationChangedProcessor processor) { + @SuppressWarnings("unchecked") + public void process(final Type eventType, final JobConfiguration jobConfig, final JobConfigurationChangedProcessor processor) { + T pipelineJobConfig = (T) PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getYamlJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter()); String jobId = jobConfig.getJobName(); if (jobConfig.isDisabled()) { PipelineJobRegistry.stop(jobId); @@ -54,7 +58,7 @@ public void process(final Type eventType, final JobConfiguration jobConfig, fina if (PipelineJobRegistry.isExisting(jobId)) { log.info("{} added to executing jobs failed since it already exists", jobId); } else { - executeJob(jobConfig, processor); + executeJob(jobConfig, pipelineJobConfig, processor); } break; case DELETED: @@ -73,8 +77,8 @@ private void disableJob(final String jobId) { } } - private void executeJob(final JobConfiguration jobConfig, final JobConfigurationChangedProcessor processor) { - PipelineJob job = processor.createJob(); + private void executeJob(final JobConfiguration jobConfig, final T pipelineJobConfig, final JobConfigurationChangedProcessor processor) { + PipelineJob job = processor.createJob(pipelineJobConfig); String jobId = jobConfig.getJobName(); PipelineJobRegistry.add(jobId, job); OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfig); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessor.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessor.java index 2a1d53831cb32..88b3253cef313 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/config/processor/JobConfigurationChangedProcessor.java @@ -18,27 +18,32 @@ package org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob; +import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration; import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; /** * Job configuration changed processor. + * + * @param type of pipeline job configuration */ -public interface JobConfigurationChangedProcessor extends TypedSPI { +public interface JobConfigurationChangedProcessor extends TypedSPI { /** * Create pipeline job. - * + * + * @param jobConfig pipeline job configuration * @return pipeline job */ - PipelineJob createJob(); + PipelineJob createJob(T jobConfig); /** * Clean pipeline job. * * @param jobConfig pipeline job configuration */ - void clean(JobConfiguration jobConfig); + default void clean(JobConfiguration jobConfig) { + } @Override String getType(); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java index 5775ed9a2d363..4afb66b446ac2 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java @@ -20,22 +20,18 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob; -import org.apache.shardingsphere.elasticjob.api.JobConfiguration; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration; /** * Consistency check job configuration changed processor. */ -public final class ConsistencyCheckJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor { +public final class ConsistencyCheckJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor { @Override - public PipelineJob createJob() { + public PipelineJob createJob(final ConsistencyCheckJobConfiguration jobConfig) { return new ConsistencyCheckJob(); } - @Override - public void clean(final JobConfiguration jobConfig) { - } - @Override public String getType() { return "CONSISTENCY_CHECK"; diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java index abb422700d91a..39545ca035574 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java @@ -20,6 +20,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob; +import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer; import org.apache.shardingsphere.elasticjob.api.JobConfiguration; @@ -27,10 +28,10 @@ /** * Migration job configuration changed processor. */ -public final class MigrationJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor { +public final class MigrationJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor { @Override - public PipelineJob createJob() { + public PipelineJob createJob(final MigrationJobConfiguration jobConfig) { return new MigrationJob(); }