Skip to content

Commit

Permalink
Remove InventoryIncrementalJobManager.aggregateDataConsistencyCheckRe…
Browse files Browse the repository at this point in the history
…sults()
  • Loading branch information
terrymanu committed Nov 20, 2023
1 parent be4bf84 commit c4f5c91
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.data.pipeline.core.job.service;

import com.google.common.base.Preconditions;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
Expand All @@ -30,7 +29,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
Expand Down Expand Up @@ -145,16 +143,4 @@ public JobOffsetInfo getJobOffsetInfo(final String jobId) {
Optional<String> offsetInfo = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
return new YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ? YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new YamlJobOffsetInfo());
}

/**
* Aggregate data consistency check results.
*
* @param jobId job ID
* @param checkResults check results
* @return check success or not
*/
public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, TableDataConsistencyCheckResult> checkResults) {
Preconditions.checkArgument(!checkResults.isEmpty(), "checkResults empty, jobId:", jobId);
return checkResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
Expand All @@ -47,7 +45,6 @@
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

import java.sql.Timestamp;
import java.time.Duration;
Expand Down Expand Up @@ -235,9 +232,9 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) {
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
result.setErrorMessage(new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()).getErrorMessage(checkJobId, 0));
Map<String, TableDataConsistencyCheckResult> checkJobResult = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
fillInJobItemInfoWithCheckResult(result, checkJobResult, parentJobId);
result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched())
Map<String, TableDataConsistencyCheckResult> checkJobResults = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
result.setCheckSuccess(checkJobResults.isEmpty() ? null : checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched())
.map(Entry::getKey).collect(Collectors.joining(",")));
return result;
}
Expand Down Expand Up @@ -275,16 +272,6 @@ private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemIn
}
}

private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo result, final Map<String, TableDataConsistencyCheckResult> checkJobResult, final String parentJobId) {
if (checkJobResult.isEmpty()) {
result.setCheckSuccess(null);
} else {
InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(
(InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType()));
result.setCheckSuccess(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult));
}
}

@Override
public YamlConsistencyCheckJobConfigurationSwapper getYamlJobConfigurationSwapper() {
return new YamlConsistencyCheckJobConfigurationSwapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -193,27 +192,6 @@ void assertDataConsistencyCheck() {
assertTrue(checkResultMap.get(checkKey).isMatched());
}

@Test
void assertAggregateEmptyDataConsistencyCheckResults() {
assertThrows(IllegalArgumentException.class, () -> inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job", Collections.emptyMap()));
}

@Test
void assertAggregateDifferentDataConsistencyCheckResults() {
Map<String, TableDataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1F);
checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(false));
assertFalse(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job", checkResults));
}

@Test
void assertAggregateSameDataConsistencyCheckResults() {
Map<String, TableDataConsistencyCheckResult> checkResults = new LinkedHashMap<>(2, 1F);
checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(true));
assertTrue(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job", checkResults));
}

@Test
void assertSwitchClusterConfigurationSucceed() {
final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
Expand Down

0 comments on commit c4f5c91

Please sign in to comment.