diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java index 2ef78b7e8bf25..54ee4ee19ec97 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; -import com.google.common.base.Preconditions; import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; @@ -30,7 +29,6 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper; import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; @@ -145,16 +143,4 @@ public JobOffsetInfo getJobOffsetInfo(final String jobId) { Optional offsetInfo = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId); return new YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ? YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new YamlJobOffsetInfo()); } - - /** - * Aggregate data consistency check results. - * - * @param jobId job ID - * @param checkResults check results - * @return check success or not - */ - public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map checkResults) { - Preconditions.checkArgument(!checkResults.isEmpty(), "checkResults empty, jobId:", jobId); - return checkResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched); - } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 63e6ff424362a..cf166153d5422 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -31,8 +31,6 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; @@ -47,7 +45,6 @@ import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; -import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import java.sql.Timestamp; import java.time.Duration; @@ -235,9 +232,9 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) { result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse("")); fillInJobItemInfoWithCheckAlgorithm(result, checkJobId); result.setErrorMessage(new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()).getErrorMessage(checkJobId, 0)); - Map checkJobResult = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId); - fillInJobItemInfoWithCheckResult(result, checkJobResult, parentJobId); - result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched()) + Map checkJobResults = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId); + result.setCheckSuccess(checkJobResults.isEmpty() ? null : checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched)); + result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched()) .map(Entry::getKey).collect(Collectors.joining(","))); return result; } @@ -275,16 +272,6 @@ private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemIn } } - private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo result, final Map checkJobResult, final String parentJobId) { - if (checkJobResult.isEmpty()) { - result.setCheckSuccess(null); - } else { - InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager( - (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType())); - result.setCheckSuccess(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult)); - } - } - @Override public YamlConsistencyCheckJobConfigurationSwapper getYamlJobConfigurationSwapper() { return new YamlConsistencyCheckJobConfigurationSwapper(); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 84a4e55572e71..ce9b79e1e49e8 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -67,7 +67,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -193,27 +192,6 @@ void assertDataConsistencyCheck() { assertTrue(checkResultMap.get(checkKey).isMatched()); } - @Test - void assertAggregateEmptyDataConsistencyCheckResults() { - assertThrows(IllegalArgumentException.class, () -> inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job", Collections.emptyMap())); - } - - @Test - void assertAggregateDifferentDataConsistencyCheckResults() { - Map checkResults = new LinkedHashMap<>(2, 1F); - checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true)); - checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(false)); - assertFalse(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job", checkResults)); - } - - @Test - void assertAggregateSameDataConsistencyCheckResults() { - Map checkResults = new LinkedHashMap<>(2, 1F); - checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true)); - checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(true)); - assertTrue(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job", checkResults)); - } - @Test void assertSwitchClusterConfigurationSucceed() { final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();