From bf156c822d75afa53f8da1475759f83f2b416de7 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 11 Nov 2023 12:27:56 +0800 Subject: [PATCH] Refactor PipelineJobAPI --- .../pipeline/core/job/service/PipelineJobAPI.java | 13 ++++--------- .../service/impl/AbstractPipelineJobAPIImpl.java | 5 ----- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 6 ++---- .../consistencycheck/ConsistencyCheckJobType.java | 4 +--- .../api/impl/ConsistencyCheckJobAPI.java | 9 +++------ .../scenario/migration/MigrationJobType.java | 4 +--- .../migration/api/impl/MigrationJobAPI.java | 7 ++----- 7 files changed, 13 insertions(+), 35 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index 94e5c5e34dc4a..5a2bfefcbc5cc 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; -import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; @@ -27,8 +26,8 @@ import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; @@ -41,13 +40,6 @@ @SingletonSPI public interface PipelineJobAPI extends TypedSPI { - /** - * Get job type. - * - * @return job type - */ - JobType getJobType(); - /** * Marshal pipeline job id. * @@ -177,4 +169,7 @@ public interface PipelineJobAPI extends TypedSPI { * @param shardingItem sharding item */ void cleanJobItemErrorMessage(String jobId, int shardingItem); + + @Override + String getType(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java index 6c353f52694a9..9a0cf65b82c5d 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java @@ -168,11 +168,6 @@ public final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) { return result; } - @Override - public String getType() { - return getJobType().getType(); - } - @Override public String getJobItemErrorMessage(final String jobId, final int shardingItem) { return Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId, shardingItem)).orElse(""); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 09ceb2debd5fe..e203e1a2533c2 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -24,7 +24,6 @@ import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration; @@ -52,7 +51,6 @@ import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; @@ -368,7 +366,7 @@ protected String getJobClassName() { } @Override - public JobType getJobType() { - return new CDCJobType(); + public String getType() { + return "STREAMING"; } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java index ce87dac4dfa07..87e71d934de21 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java @@ -24,11 +24,9 @@ */ public final class ConsistencyCheckJobType implements JobType { - public static final String TYPE_CODE = "02"; - @Override public String getCode() { - return TYPE_CODE; + return "02"; } @Override diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index b9d1a11426f47..087ee6833942b 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -19,7 +19,6 @@ import com.google.common.base.Strings; import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; @@ -31,8 +30,6 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; @@ -47,9 +44,9 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractPipelineJobAPIImpl; +import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration; @@ -405,7 +402,7 @@ protected String getJobClassName() { } @Override - public JobType getJobType() { - return JobCodeRegistry.getJobType(ConsistencyCheckJobType.TYPE_CODE); + public String getType() { + return "CONSISTENCY_CHECK"; } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java index 92d305bca0f36..ab7c82ebd9052 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java @@ -24,11 +24,9 @@ */ public final class MigrationJobType implements JobType { - public static final String TYPE_CODE = "01"; - @Override public String getCode() { - return TYPE_CODE; + return "01"; } @Override diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 1c310065bc5f2..2e273e15c4006 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -38,8 +38,6 @@ import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry; -import org.apache.shardingsphere.data.pipeline.common.job.type.JobType; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils; @@ -63,7 +61,6 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; -import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType; import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration; @@ -490,8 +487,8 @@ public void refreshTableMetadata(final String jobId, final String databaseName) } @Override - public JobType getJobType() { - return JobCodeRegistry.getJobType(MigrationJobType.TYPE_CODE); + public String getType() { + return "MIGRATION"; } @Override