Skip to content

Commit

Permalink
Add YamlPipelineJobItemProgressSwapper
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 18, 2023
1 parent f7517c1 commit 0eb0c5a
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
public interface InventoryIncrementalJobAPI extends PipelineJobAPI {

@Override
default YamlInventoryIncrementalJobItemProgressSwapper getYamlPipelineJobItemProgressSwapper() {
default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwapper() {
return new YamlInventoryIncrementalJobItemProgressSwapper();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface PipelineJobAPI extends TypedSPI {
* @return YAML pipeline job item progress swapper
*/
@SuppressWarnings("rawtypes")
YamlPipelineJobItemProgressSwapper getYamlPipelineJobItemProgressSwapper();
YamlPipelineJobItemProgressSwapper getYamlJobItemProgressSwapper();

/**
* Whether to ignore to start disabled job when job item progress is finished.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void updateJobItemProgress(final PipelineJobItemContext jobItemContext) {

@SuppressWarnings("unchecked")
private String convertJobItemProgress(final PipelineJobItemContext jobItemContext) {
return YamlEngine.marshal(jobAPI.getYamlPipelineJobItemProgressSwapper().swapToYamlConfiguration(jobItemContext.toProgress()));
return YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemContext.toProgress()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public JobOffsetInfo getJobOffsetInfo(final String jobId) {
@Override
public Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId, shardingItem);
return progress.map(optional -> getYamlPipelineJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(optional, YamlInventoryIncrementalJobItemProgress.class)));
return progress.map(optional -> getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(optional, YamlInventoryIncrementalJobItemProgress.class)));
}

@Override
Expand All @@ -144,7 +144,7 @@ public void updateJobItemStatus(final String jobId, final int shardingItem, fina
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem,
YamlEngine.marshal(getYamlPipelineJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, i, new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
InventoryIncrementalJobItemProgress jobItemProgress = getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, dumperContext);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
jobId, i, YamlEngine.marshal(getYamlPipelineJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
jobId, i, YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
}
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
@Override
public Optional<ConsistencyCheckJobItemProgress> getJobItemProgress(final String jobId, final int shardingItem) {
Optional<String> progress = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId, shardingItem);
return progress.map(s -> getYamlPipelineJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(s, YamlConsistencyCheckJobItemProgress.class, true)));
return progress.map(s -> getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(s, YamlConsistencyCheckJobItemProgress.class, true)));
}

@Override
Expand All @@ -132,7 +132,7 @@ public void updateJobItemStatus(final String jobId, final int shardingItem, fina
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem,
YamlEngine.marshal(getYamlPipelineJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}

/**
Expand Down Expand Up @@ -307,7 +307,7 @@ public YamlConsistencyCheckJobConfigurationSwapper getYamlJobConfigurationSwappe
}

@Override
public YamlConsistencyCheckJobItemProgressSwapper getYamlPipelineJobItemProgressSwapper() {
public YamlConsistencyCheckJobItemProgressSwapper getYamlJobItemProgressSwapper() {
return new YamlConsistencyCheckJobItemProgressSwapper();
}

Expand Down

0 comments on commit 0eb0c5a

Please sign in to comment.