diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index 59c4a42fb7afb..9f7fd928521df 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -24,6 +24,7 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; +import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; @@ -160,6 +161,13 @@ public interface PipelineJobAPI extends TypedSPI { */ void cleanJobItemErrorMessage(String jobId, int shardingItem); + /** + * Get pipeline job class. + * + * @return pipeline job class + */ + Class getPipelineJobClass(); + @Override String getType(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java index 5dbd46b1cb9cc..fa8a1f78e253e 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java @@ -60,13 +60,11 @@ public Optional start(final PipelineJobConfiguration jobConfig) { log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey); return Optional.of(jobId); } - repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), getJobClassName()); + repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), getPipelineJobClass().getName()); repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(convertJobConfiguration(jobConfig))); return Optional.of(jobId); } - protected abstract String getJobClassName(); - protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfiguration jobConfig) { JobConfigurationPOJO result = new JobConfigurationPOJO(); result.setJobName(jobConfig.getJobId()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 482b05d21d5d9..9c99ae5f0967f 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -122,7 +122,7 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT if (repositoryAPI.isExisted(jobConfigKey)) { log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey); } else { - repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClassName()); + repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getPipelineJobClass().getName()); JobConfigurationPOJO jobConfigPOJO = convertJobConfiguration(jobConfig); jobConfigPOJO.setDisabled(true); repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO)); @@ -345,8 +345,8 @@ public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final } @Override - protected String getJobClassName() { - return CDCJob.class.getName(); + public Class getPipelineJobClass() { + return CDCJob.class; } @Override diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 7a894858c807d..58d4051ea0829 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -390,8 +390,8 @@ public PipelineJobInfo getJobInfo(final String jobId) { } @Override - protected String getJobClassName() { - return ConsistencyCheckJob.class.getName(); + public Class getPipelineJobClass() { + return ConsistencyCheckJob.class; } @Override diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 7c3cbcd214136..4a6a2b2c37cbc 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -473,12 +473,12 @@ public void refreshTableMetadata(final String jobId, final String databaseName) } @Override - public String getType() { - return "MIGRATION"; + public Class getPipelineJobClass() { + return MigrationJob.class; } @Override - protected String getJobClassName() { - return MigrationJob.class.getName(); + public String getType() { + return "MIGRATION"; } }