Skip to content

Commit

Permalink
Refactor ConsistencyCheckJobOption
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 25, 2023
1 parent 267bb42 commit 0aebf48
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.data.pipeline.core.exception.job;

import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;

Expand All @@ -27,7 +28,7 @@ public final class UncompletedConsistencyCheckJobExistsException extends Pipelin

private static final long serialVersionUID = 2854259384634892428L;

public UncompletedConsistencyCheckJobExistsException(final String jobId) {
super(XOpenSQLState.GENERAL_ERROR, 96, String.format("Uncompleted consistency check job `%s` exists.", jobId));
public UncompletedConsistencyCheckJobExistsException(final String jobId, final ConsistencyCheckJobItemProgress progress) {
super(XOpenSQLState.GENERAL_ERROR, 96, String.format("Uncompleted consistency check job `%s` exists, progress `%s`", jobId, progress));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
Expand Down Expand Up @@ -80,35 +81,39 @@ public String createJobAndStart(final CreateConsistencyCheckJobParameter param)
PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId));
Optional<String> latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> progress = jobItemManager.getProgress(latestCheckJobId.get(), 0);
if (!progress.isPresent() || JobStatus.FINISHED != progress.get().getStatus()) {
log.info("check job already exists and status is not FINISHED, progress={}", progress);
throw new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
}
Optional<ConsistencyCheckJobItemProgress> progress = new PipelineJobItemManager<ConsistencyCheckJobItemProgress>(getYamlJobItemProgressSwapper()).getProgress(latestCheckJobId.get(), 0);
ShardingSpherePreconditions.checkState(progress.isPresent() && JobStatus.FINISHED == progress.get().getStatus(),
() -> new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get(), progress.orElse(null)));
}
verifyPipelineDatabaseType(param);
checkPipelineDatabaseType(param);
PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId);
String result = latestCheckJobId.map(s -> new ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
String result = latestCheckJobId.map(optional -> new ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, result);
governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, result);
new PipelineJobManager(this).drop(result);
YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration();
yamlConfig.setJobId(result);
yamlConfig.setParentJobId(parentJobId);
yamlConfig.setAlgorithmTypeName(param.getAlgorithmTypeName());
yamlConfig.setAlgorithmProps(param.getAlgorithmProps());
yamlConfig.setSourceDatabaseType(param.getSourceDatabaseType().getType());
new PipelineJobManager(this).start(new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig));
new PipelineJobManager(this).start(new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(getYamlConfiguration(result, parentJobId, param)));
return result;
}

private void verifyPipelineDatabaseType(final CreateConsistencyCheckJobParameter param) {
Collection<DatabaseType> supportedDatabaseTypes = TableDataConsistencyCheckerFactory.newInstance(param.getAlgorithmTypeName(), param.getAlgorithmProps()).getSupportedDatabaseTypes();
private void checkPipelineDatabaseType(final CreateConsistencyCheckJobParameter param) {
Collection<DatabaseType> supportedDatabaseTypes;
try (TableDataConsistencyChecker checker = TableDataConsistencyCheckerFactory.newInstance(param.getAlgorithmTypeName(), param.getAlgorithmProps())) {
supportedDatabaseTypes = checker.getSupportedDatabaseTypes();
}
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getSourceDatabaseType()), () -> new UnsupportedPipelineDatabaseTypeException(param.getSourceDatabaseType()));
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getTargetDatabaseType()), () -> new UnsupportedPipelineDatabaseTypeException(param.getTargetDatabaseType()));
}

private YamlConsistencyCheckJobConfiguration getYamlConfiguration(final String jobId, final String parentJobId, final CreateConsistencyCheckJobParameter param) {
YamlConsistencyCheckJobConfiguration result = new YamlConsistencyCheckJobConfiguration();
result.setJobId(jobId);
result.setParentJobId(parentJobId);
result.setAlgorithmTypeName(param.getAlgorithmTypeName());
result.setAlgorithmProps(param.getAlgorithmProps());
result.setSourceDatabaseType(param.getSourceDatabaseType().getType());
return result;
}

@Override
public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
return true;
Expand Down

0 comments on commit 0aebf48

Please sign in to comment.