From dfd875238d345a3436d8c2b939e15ea0083b09c5 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 18 Nov 2023 22:58:22 +0800 Subject: [PATCH] Add InventoryIncrementalJobManager --- .../InventoryIncrementalJobManager.java | 21 +++---------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java index 1a29cd942b7d8..efe7ce8af46ae 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java @@ -19,7 +19,6 @@ import com.google.common.base.Preconditions; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; @@ -43,7 +42,6 @@ import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -52,15 +50,12 @@ * Inventory incremental job manager. */ @RequiredArgsConstructor -@Slf4j public final class InventoryIncrementalJobManager { private final PipelineJobAPI jobAPI; private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); - private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new YamlJobOffsetInfoSwapper(); - /** * Alter process configuration. * @@ -106,7 +101,7 @@ public Map getJobProgress(final Pi * @param jobOffsetInfo job offset info */ public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo jobOffsetInfo) { - String value = YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo)); + String value = YamlEngine.marshal(new YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo)); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId, value); } @@ -118,11 +113,7 @@ public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo jobOffs */ public JobOffsetInfo getJobOffsetInfo(final String jobId) { Optional offsetInfo = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId); - if (offsetInfo.isPresent()) { - YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class); - return jobOffsetInfoSwapper.swapToObject(info); - } - return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo()); + return new YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ? YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new YamlJobOffsetInfo()); } /** @@ -153,12 +144,6 @@ private Collection getSupportedDatabaseTypes(final Collection checkResults) { Preconditions.checkArgument(!checkResults.isEmpty(), "checkResults empty, jobId:", jobId); - for (Entry entry : checkResults.entrySet()) { - TableDataConsistencyCheckResult checkResult = entry.getValue(); - if (!checkResult.isMatched()) { - return false; - } - } - return true; + return checkResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched); } }