From 0aebf485c38ef219efb80e03695482f17f921a13 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 26 Nov 2023 01:01:07 +0800 Subject: [PATCH] Refactor ConsistencyCheckJobOption --- ...tedConsistencyCheckJobExistsException.java | 5 ++- .../api/impl/ConsistencyCheckJobOption.java | 39 +++++++++++-------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java index f437ad6d5472d..2465838abc4ca 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.exception.job; +import org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress; import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException; import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState; @@ -27,7 +28,7 @@ public final class UncompletedConsistencyCheckJobExistsException extends Pipelin private static final long serialVersionUID = 2854259384634892428L; - public UncompletedConsistencyCheckJobExistsException(final String jobId) { - super(XOpenSQLState.GENERAL_ERROR, 96, String.format("Uncompleted consistency check job `%s` exists.", jobId)); + public UncompletedConsistencyCheckJobExistsException(final String jobId, final ConsistencyCheckJobItemProgress progress) { + super(XOpenSQLState.GENERAL_ERROR, 96, String.format("Uncompleted consistency check job `%s` exists, progress `%s`", jobId, progress)); } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java index c793a22bd500f..2505835765183 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobOption.java @@ -26,6 +26,7 @@ import org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory; import org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException; import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException; @@ -80,35 +81,39 @@ public String createJobAndStart(final CreateConsistencyCheckJobParameter param) PipelineGovernanceFacade governanceFacade = PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)); Optional latestCheckJobId = governanceFacade.getJobFacade().getCheck().getLatestCheckJobId(parentJobId); if (latestCheckJobId.isPresent()) { - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); - Optional progress = jobItemManager.getProgress(latestCheckJobId.get(), 0); - if (!progress.isPresent() || JobStatus.FINISHED != progress.get().getStatus()) { - log.info("check job already exists and status is not FINISHED, progress={}", progress); - throw new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get()); - } + Optional progress = new PipelineJobItemManager(getYamlJobItemProgressSwapper()).getProgress(latestCheckJobId.get(), 0); + ShardingSpherePreconditions.checkState(progress.isPresent() && JobStatus.FINISHED == progress.get().getStatus(), + () -> new UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get(), progress.orElse(null))); } - verifyPipelineDatabaseType(param); + checkPipelineDatabaseType(param); PipelineContextKey contextKey = PipelineJobIdUtils.parseContextKey(parentJobId); - String result = latestCheckJobId.map(s -> new ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal(); + String result = latestCheckJobId.map(optional -> new ConsistencyCheckJobId(contextKey, parentJobId, optional)).orElseGet(() -> new ConsistencyCheckJobId(contextKey, parentJobId)).marshal(); governanceFacade.getJobFacade().getCheck().persistLatestCheckJobId(parentJobId, result); governanceFacade.getJobFacade().getCheck().deleteCheckJobResult(parentJobId, result); new PipelineJobManager(this).drop(result); - YamlConsistencyCheckJobConfiguration yamlConfig = new YamlConsistencyCheckJobConfiguration(); - yamlConfig.setJobId(result); - yamlConfig.setParentJobId(parentJobId); - yamlConfig.setAlgorithmTypeName(param.getAlgorithmTypeName()); - yamlConfig.setAlgorithmProps(param.getAlgorithmProps()); - yamlConfig.setSourceDatabaseType(param.getSourceDatabaseType().getType()); - new PipelineJobManager(this).start(new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig)); + new PipelineJobManager(this).start(new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(getYamlConfiguration(result, parentJobId, param))); return result; } - private void verifyPipelineDatabaseType(final CreateConsistencyCheckJobParameter param) { - Collection supportedDatabaseTypes = TableDataConsistencyCheckerFactory.newInstance(param.getAlgorithmTypeName(), param.getAlgorithmProps()).getSupportedDatabaseTypes(); + private void checkPipelineDatabaseType(final CreateConsistencyCheckJobParameter param) { + Collection supportedDatabaseTypes; + try (TableDataConsistencyChecker checker = TableDataConsistencyCheckerFactory.newInstance(param.getAlgorithmTypeName(), param.getAlgorithmProps())) { + supportedDatabaseTypes = checker.getSupportedDatabaseTypes(); + } ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getSourceDatabaseType()), () -> new UnsupportedPipelineDatabaseTypeException(param.getSourceDatabaseType())); ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getTargetDatabaseType()), () -> new UnsupportedPipelineDatabaseTypeException(param.getTargetDatabaseType())); } + private YamlConsistencyCheckJobConfiguration getYamlConfiguration(final String jobId, final String parentJobId, final CreateConsistencyCheckJobParameter param) { + YamlConsistencyCheckJobConfiguration result = new YamlConsistencyCheckJobConfiguration(); + result.setJobId(jobId); + result.setParentJobId(parentJobId); + result.setAlgorithmTypeName(param.getAlgorithmTypeName()); + result.setAlgorithmProps(param.getAlgorithmProps()); + result.setSourceDatabaseType(param.getSourceDatabaseType().getType()); + return result; + } + @Override public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() { return true;