Skip to content

Commit

Permalink
Refactor PipelineJobAPI (#29007)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 11, 2023
1 parent 213166a commit 720a250
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
Expand All @@ -27,8 +26,8 @@
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

Expand All @@ -41,13 +40,6 @@
@SingletonSPI
public interface PipelineJobAPI extends TypedSPI {

/**
* Get job type.
*
* @return job type
*/
JobType getJobType();

/**
* Marshal pipeline job id.
*
Expand Down Expand Up @@ -177,4 +169,7 @@ public interface PipelineJobAPI extends TypedSPI {
* @param shardingItem sharding item
*/
void cleanJobItemErrorMessage(String jobId, int shardingItem);

@Override
String getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,6 @@ public final JobConfigurationPOJO getElasticJobConfigPOJO(final String jobId) {
return result;
}

@Override
public String getType() {
return getJobType().getType();
}

@Override
public String getJobItemErrorMessage(final String jobId, final int shardingItem) {
return Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId, shardingItem)).orElse("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
Expand Down Expand Up @@ -52,7 +51,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
Expand Down Expand Up @@ -368,7 +366,7 @@ protected String getJobClassName() {
}

@Override
public JobType getJobType() {
return new CDCJobType();
public String getType() {
return "STREAMING";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
*/
public final class ConsistencyCheckJobType implements JobType {

public static final String TYPE_CODE = "02";

@Override
public String getCode() {
return TYPE_CODE;
return "02";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
Expand All @@ -31,8 +30,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
Expand All @@ -47,9 +44,9 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractPipelineJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
Expand Down Expand Up @@ -405,7 +402,7 @@ protected String getJobClassName() {
}

@Override
public JobType getJobType() {
return JobCodeRegistry.getJobType(ConsistencyCheckJobType.TYPE_CODE);
public String getType() {
return "CONSISTENCY_CHECK";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
*/
public final class MigrationJobType implements JobType {

public static final String TYPE_CODE = "01";

@Override
public String getCode() {
return TYPE_CODE;
return "01";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
Expand All @@ -63,7 +61,6 @@
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
Expand Down Expand Up @@ -490,8 +487,8 @@ public void refreshTableMetadata(final String jobId, final String databaseName)
}

@Override
public JobType getJobType() {
return JobCodeRegistry.getJobType(MigrationJobType.TYPE_CODE);
public String getType() {
return "MIGRATION";
}

@Override
Expand Down

0 comments on commit 720a250

Please sign in to comment.