Skip to content

Commit

Permalink
Add InventoryIncrementalJobManager
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 18, 2023
1 parent 2781579 commit dfd8752
Showing 1 changed file with 3 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.base.Preconditions;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
Expand All @@ -43,7 +42,6 @@
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -52,15 +50,12 @@
* Inventory incremental job manager.
*/
@RequiredArgsConstructor
@Slf4j
public final class InventoryIncrementalJobManager {

private final PipelineJobAPI jobAPI;

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new YamlJobOffsetInfoSwapper();

/**
* Alter process configuration.
*
Expand Down Expand Up @@ -106,7 +101,7 @@ public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final Pi
* @param jobOffsetInfo job offset info
*/
public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo jobOffsetInfo) {
String value = YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo));
String value = YamlEngine.marshal(new YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo));
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId, value);
}

Expand All @@ -118,11 +113,7 @@ public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo jobOffs
*/
public JobOffsetInfo getJobOffsetInfo(final String jobId) {
Optional<String> offsetInfo = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
if (offsetInfo.isPresent()) {
YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class);
return jobOffsetInfoSwapper.swapToObject(info);
}
return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
return new YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ? YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new YamlJobOffsetInfo());
}

/**
Expand Down Expand Up @@ -153,12 +144,6 @@ private Collection<DatabaseType> getSupportedDatabaseTypes(final Collection<Data
*/
public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map<String, TableDataConsistencyCheckResult> checkResults) {
Preconditions.checkArgument(!checkResults.isEmpty(), "checkResults empty, jobId:", jobId);
for (Entry<String, TableDataConsistencyCheckResult> entry : checkResults.entrySet()) {
TableDataConsistencyCheckResult checkResult = entry.getValue();
if (!checkResult.isMatched()) {
return false;
}
}
return true;
return checkResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched);
}
}

0 comments on commit dfd8752

Please sign in to comment.