Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sonar issue of pipeline #28468

Merged
merged 5 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,21 @@ public static class YamlTableDataConsistencyCountCheckResult implements YamlConf
private long targetRecordsCount;

/**
* Add records count.
* Add source records count.
*
* @param delta delta count
* @param onSource add on source or target
*/
public void addRecordsCount(final long delta, final boolean onSource) {
if (onSource) {
sourceRecordsCount += delta;
} else {
targetRecordsCount += delta;
}
public void addSourceRecordsCount(final long delta) {
sourceRecordsCount += delta;
}

/**
* Add target records count.
*
* @param delta delta count
*/
public void addTargetRecordsCount(final long delta) {
targetRecordsCount += delta;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iter
}
SingleTableInventoryCalculatedResult sourceCalculatedResult = waitFuture(executor.submit(sourceCalculatedResults::next));
SingleTableInventoryCalculatedResult targetCalculatedResult = waitFuture(executor.submit(targetCalculatedResults::next));
checkResult.getCountCheckResult().addRecordsCount(sourceCalculatedResult.getRecordsCount(), true);
checkResult.getCountCheckResult().addRecordsCount(targetCalculatedResult.getRecordsCount(), false);
checkResult.getCountCheckResult().addSourceRecordsCount(sourceCalculatedResult.getRecordsCount());
checkResult.getCountCheckResult().addTargetRecordsCount(targetCalculatedResult.getRecordsCount());
if (!Objects.equals(sourceCalculatedResult, targetCalculatedResult)) {
checkResult.getContentCheckResult().setMatched(false);
log.info("content matched false, jobId={}, sourceTable={}, targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), param.getTargetTable(), param.getUniqueKeys());
Expand All @@ -117,12 +117,12 @@ private TableDataConsistencyCheckResult checkSingleTableInventoryData(final Iter
}
if (sourceCalculatedResults.hasNext()) {
// TODO Refactor SingleTableInventoryCalculatedResult to represent inaccurate number
checkResult.getCountCheckResult().addRecordsCount(1, true);
checkResult.getCountCheckResult().addSourceRecordsCount(1);
checkResult.getContentCheckResult().setMatched(false);
return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
if (targetCalculatedResults.hasNext()) {
checkResult.getCountCheckResult().addRecordsCount(1, false);
checkResult.getCountCheckResult().addTargetRecordsCount(1);
checkResult.getContentCheckResult().setMatched(false);
return new YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,29 @@

package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;

import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.DataMatchTableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.junit.jupiter.api.Test;
import org.mockito.internal.configuration.plugins.Plugins;

import java.util.Arrays;
import java.util.Properties;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

class DataMatchTableDataConsistencyCheckerTest {

@SneakyThrows(ReflectiveOperationException.class)
@Test
void assertInitSuccess() {
for (String each : Arrays.asList("1", "1000")) {
new DataMatchTableDataConsistencyChecker().init(buildAlgorithmProperties(each));
DataMatchTableDataConsistencyChecker checker = new DataMatchTableDataConsistencyChecker();
checker.init(buildAlgorithmProperties(each));
String actual = Plugins.getMemberAccessor().get(DataMatchTableDataConsistencyChecker.class.getDeclaredField("chunkSize"), checker).toString();
assertThat(actual, is(each));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,22 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) {
result.setCheckSuccess(null);
return result;
}
LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
Map<String, TableDataConsistencyCheckResult> checkJobResult = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
fillInJobItemInfoWithCheckResult(result, checkJobResult, parentJobId);
result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched())
.map(Entry::getKey).collect(Collectors.joining(",")));
return result;
}

private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result, final ConsistencyCheckJobItemProgress jobItemProgress, final JobConfigurationPOJO jobConfigPOJO) {
long recordsCount = jobItemProgress.getRecordsCount();
long checkedRecordsCount = Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
LocalDateTime checkBeginTime = new Timestamp(jobItemProgress.getCheckBeginTimeMillis()).toLocalDateTime();
result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime));
if (JobStatus.FINISHED == jobItemProgress.getStatus()) {
result.setInventoryFinishedPercentage(100);
LocalDateTime checkEndTime = new Timestamp(jobItemProgress.getCheckEndTimeMillis()).toLocalDateTime();
Expand All @@ -332,26 +345,24 @@ private ConsistencyCheckJobItemInfo getJobItemInfo(final String parentJobId) {
long remainingMills = Math.max(0, (long) ((recordsCount - checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis));
result.setInventoryRemainingSeconds(remainingMills / 1000);
}
String tableNames = jobItemProgress.getTableNames();
result.setTableNames(Optional.ofNullable(tableNames).orElse(""));
result.setCheckBeginTime(DATE_TIME_FORMATTER.format(checkBeginTime));
}

private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) {
ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(checkJobId);
result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
if (null != jobConfig.getAlgorithmProps()) {
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(",")));
}
result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
Map<String, TableDataConsistencyCheckResult> checkJobResult = governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
}

private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo result, final Map<String, TableDataConsistencyCheckResult> checkJobResult, final String parentJobId) {
if (checkJobResult.isEmpty()) {
result.setCheckSuccess(null);
} else {
InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(
PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType());
result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult));
}
result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each -> !each.getValue().isIgnored() && !each.getValue().isMatched())
.map(Entry::getKey).collect(Collectors.joining(",")));
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;

import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
Expand All @@ -45,7 +44,6 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

@RequiredArgsConstructor
@Slf4j
Expand All @@ -72,27 +70,23 @@ public final class E2EIncrementalTask extends BaseIncrementTask {

private final int loopCount;

@SneakyThrows(InterruptedException.class)
@Override
public void run() {
List<Object[]> orderInsertData = PipelineCaseHelper.generateOrderInsertData(databaseType, primaryKeyGenerateAlgorithm, loopCount);
List<Object> primaryKeys = new LinkedList<>();
for (Object[] each : orderInsertData) {
primaryKeys.add(each[0]);
insertOrder(each);
TimeUnit.MILLISECONDS.sleep(100L);
}
ThreadLocalRandom random = ThreadLocalRandom.current();
for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
// TODO 0000-00-00 00:00:00 now will cause consistency check failed of MySQL.
// DataSourceUtil.execute(dataSource, String.format("UPDATE %s SET t_datetime='0000-00-00 00:00:00' WHERE order_id = ?", orderTableName)
updateOrderById(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
TimeUnit.MILLISECONDS.sleep(500L);
}
for (int i = 0; i < Math.max(1, loopCount / 3); i++) {
setNullToAllFields(primaryKeys.get(random.nextInt(0, primaryKeys.size())));
deleteOrderById(primaryKeys.remove(random.nextInt(0, primaryKeys.size())));
TimeUnit.MILLISECONDS.sleep(500L);
}
log.info("increment task runnable execute successfully.");
}
Expand Down