From 72d50eb0b8a4f3a3a66ed59709f859545c3f9eaf Mon Sep 17 00:00:00 2001 From: Hongsheng Zhong Date: Tue, 19 Sep 2023 19:21:39 +0800 Subject: [PATCH 1/5] Refactor YamlTableDataConsistencyCheckResult.addRecordsCount --- .../YamlTableDataConsistencyCheckResult.java | 20 +++++++++++-------- .../table/MatchingTableInventoryChecker.java | 8 ++++---- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java index 6dc803ec3edef..9a1e417449293 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/yaml/YamlTableDataConsistencyCheckResult.java @@ -54,17 +54,21 @@ public static class YamlTableDataConsistencyCountCheckResult implements YamlConf private long targetRecordsCount; /** - * Add records count. + * Add source records count. * * @param delta delta count - * @param onSource add on source or target */ - public void addRecordsCount(final long delta, final boolean onSource) { - if (onSource) { - sourceRecordsCount += delta; - } else { - targetRecordsCount += delta; - } + public void addSourceRecordsCount(final long delta) { + sourceRecordsCount += delta; + } + + /** + * Add target records count. + * + * @param delta delta count + */ + public void addTargetRecordsCount(final long delta) { + targetRecordsCount += delta; } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java index 3e485e97f52ad..8e4734aabc85b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java @@ -100,8 +100,8 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iter } SingleTableInventoryCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next)); SingleTableInventoryCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next)); - checkResult.getCountCheckResult().addRecordsCount(sourceCalculatedResult.getRecordsCount(), true); - checkResult.getCountCheckResult().addRecordsCount(targetCalculatedResult.getRecordsCount(), false); + checkResult.getCountCheckResult().addSourceRecordsCount(sourceCalculatedResult.getRecordsCount()); + checkResult.getCountCheckResult().addTargetRecordsCount(targetCalculatedResult.getRecordsCount()); if (!Objects.equals(sourceCalculatedResult, targetCalculatedResult)) { checkResult.getContentCheckResult().setMatched(false); log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), param.getTargetTable(), param.getUniqueKeys()); @@ -117,12 +117,12 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iter } if (sourceCalculatedResults.hasNext()) { // TODO Refactor SingleTableInventoryCalculatedResult to represent inaccurate number - checkResult.getCountCheckResult().addRecordsCount(1, true); + checkResult.getCountCheckResult().addSourceRecordsCount(1); checkResult.getContentCheckResult().setMatched(false); return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult); } if (targetCalculatedResults.hasNext()) { - checkResult.getCountCheckResult().addRecordsCount(1, false); + checkResult.getCountCheckResult().addTargetRecordsCount(1); checkResult.getContentCheckResult().setMatched(false); return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult); } From 42ec24e0851166469eb8b0d78f017ca3e3cb4902 Mon Sep 17 00:00:00 2001 From: Hongsheng Zhong Date: Tue, 19 Sep 2023 19:30:23 +0800 Subject: [PATCH 2/5] Refactor DataMatchTableDataConsistencyCheckerTest.assertInitSuccess --- .../DataMatchTableDataConsistencyCheckerTest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/DataMatchTableDataConsistencyCheckerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/DataMatchTableDataConsistencyCheckerTest.java index 01a3dddc40ec2..0f17d07c1676a 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/DataMatchTableDataConsistencyCheckerTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/DataMatchTableDataConsistencyCheckerTest.java @@ -17,21 +17,29 @@ package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator; +import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.DataMatchTableDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.junit.jupiter.api.Test; +import org.mockito.internal.configuration.plugins.Plugins; import java.util.Arrays; import java.util.Properties; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; class DataMatchTableDataConsistencyCheckerTest { + @SneakyThrows(ReflectiveOperationException.class) @Test void assertInitSuccess() { for (String each : Arrays.asList("1", "1000")) { - new DataMatchTableDataConsistencyChecker().init(buildAlgorithmProperties(each)); + DataMatchTableDataConsistencyChecker checker = new DataMatchTableDataConsistencyChecker(); + checker.init(buildAlgorithmProperties(each)); + String actual = Plugins.getMemberAccessor().get(DataMatchTableDataConsistencyChecker.class.getDeclaredField("chunkSize"), checker).toString(); + assertThat(actual, is(each)); } } From a3b4b7debfd29bbfc37f21866249b1b92209d2dc Mon Sep 17 00:00:00 2001 From: Hongsheng Zhong Date: Tue, 19 Sep 2023 20:15:50 +0800 Subject: [PATCH 3/5] Reduce Cognitive Complexity for ConsistencyCheckJobAPI.getJobItemInfo --- .../api/impl/ConsistencyCheckJobAPI.java | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 8077fb46dff0f..029750013e254 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -311,9 +311,28 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) { result.setCheckSuccess(null); return result; } - LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime(); + fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO); + result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse("")); + fillInJobItemInfoWithAlgorithm(result, checkJobId); + result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0)); + Map checkJobResult = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId); + if (checkJobResult.isEmpty()) { + result.setCheckSuccess(null); + } else { + InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService( + PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType()); + result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult)); + } + result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched()) + .map(Entry::getKey).collect(Collectors.joining(","))); + return result; + } + + private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result, final ConsistencyCheckJobItemProgress jobItemProgress, final JobConfigurationPOJO jobConfigPOJO) { long recordsCount = jobItemProgress.getRecordsCount(); long checkedRecordsCount = Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount); + LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime(); + result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime)); if (JobStatus.FINISHED == jobItemProgress.getStatus()) { result.setInventoryFinishedPercentage(100); LocalDateTime checkEndTime = new Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime(); @@ -332,26 +351,14 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) { long remainingMills = Math.max(0, (long) ((recordsCount - checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis)); result.setInventoryRemainingSeconds(remainingMills / 1000); } - String tableNames = jobItemProgress.getTableNames(); - result.setTableNames(Optional.ofNullable(tableNames).orElse("")); - result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime)); + } + + private void fillInJobItemInfoWithAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) { ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(checkJobId); result.setAlgorithmType(jobConfig.getAlgorithmTypeName()); if (null != jobConfig.getAlgorithmProps()) { result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(","))); } - result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0)); - Map checkJobResult = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId); - if (checkJobResult.isEmpty()) { - result.setCheckSuccess(null); - } else { - InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService( - PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType()); - result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult)); - } - result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched()) - .map(Entry::getKey).collect(Collectors.joining(","))); - return result; } @Override From 07a8d2cdec741151a2e27f71400001f0e1c377fe Mon Sep 17 00:00:00 2001 From: Hongsheng Zhong Date: Tue, 19 Sep 2023 20:19:00 +0800 Subject: [PATCH 4/5] Remove sleep in E2EIncrementalTask.run --- .../e2e/data/pipeline/cases/task/E2EIncrementalTask.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java index a79edb85aef06..d03bb3b7a8652 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task; import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; @@ -45,7 +44,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; @RequiredArgsConstructor @Slf4j @@ -72,7 +70,6 @@ public final class E2EIncrementalTask extends BaseIncrementTask { private final int loopCount; - @SneakyThrows(InterruptedException.class) @Override public void run() { List orderInsertData = PipelineCaseHelper.generateOrderInsertData(databaseType, primaryKeyGenerateAlgorithm, loopCount); @@ -80,19 +77,16 @@ public void run() { for (Object[] each : orderInsertData) { primaryKeys.add(each[0]); insertOrder(each); - TimeUnit.MILLISECONDS.sleep(100L); } ThreadLocalRandom random = ThreadLocalRandom.current(); for (int i = 0; i < Math.max(1, loopCount / 3); i++) { // TODO 0000-00-00 00:00:00 now will cause consistency check failed of MySQL. // DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName) updateOrderById(primaryKeys.get(random.nextInt(0, primaryKeys.size()))); - TimeUnit.MILLISECONDS.sleep(500L); } for (int i = 0; i < Math.max(1, loopCount / 3); i++) { setNullToAllFields(primaryKeys.get(random.nextInt(0, primaryKeys.size()))); deleteOrderById(primaryKeys.remove(random.nextInt(0, primaryKeys.size()))); - TimeUnit.MILLISECONDS.sleep(500L); } log.info("increment task runnable execute successfully."); } From 9f05d334f2f9de5ec47dc4847786b93cc0376750 Mon Sep 17 00:00:00 2001 From: Hongsheng Zhong Date: Tue, 19 Sep 2023 20:25:40 +0800 Subject: [PATCH 5/5] Reduce Cognitive Complexity for ConsistencyCheckJobAPI.getJobItemInfo --- .../api/impl/ConsistencyCheckJobAPI.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 029750013e254..13b2d2a671ec9 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -313,16 +313,10 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) { } fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO); result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse("")); - fillInJobItemInfoWithAlgorithm(result, checkJobId); + fillInJobItemInfoWithCheckAlgorithm(result, checkJobId); result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0)); Map checkJobResult = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId); - if (checkJobResult.isEmpty()) { - result.setCheckSuccess(null); - } else { - InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService( - PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType()); - result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult)); - } + fillInJobItemInfoWithCheckResult(result, checkJobResult, parentJobId); result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched()) .map(Entry::getKey).collect(Collectors.joining(","))); return result; @@ -353,7 +347,7 @@ private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result } } - private void fillInJobItemInfoWithAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) { + private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) { ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(checkJobId); result.setAlgorithmType(jobConfig.getAlgorithmTypeName()); if (null != jobConfig.getAlgorithmProps()) { @@ -361,6 +355,16 @@ private void fillInJobItemInfoWithAlgorithm(final ConsistencyCheckJobItemInfo re } } + private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo result, final Map checkJobResult, final String parentJobId) { + if (checkJobResult.isEmpty()) { + result.setCheckSuccess(null); + } else { + InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService( + PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType()); + result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult)); + } + } + @Override public ConsistencyCheckJobConfiguration getJobConfiguration(final String jobId) { return getJobConfiguration(getElasticJobConfigPOJO(jobId));