Skip to content

Commit

Permalink
Add YamlPipelineJobItemProgressSwapper
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 18, 2023
1 parent 6c974cf commit 7b15f98
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ default Optional<String> getToBeStoppedPreviousJobType() {
*
* @return pipeline job class
*/
Class<? extends PipelineJob> getPipelineJobClass();
Class<? extends PipelineJob> getJobClass();

@Override
String getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
log.warn("jobId already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
return Optional.of(jobId);
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getPipelineJobClass().getName());
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), jobAPI.getJobClass().getName());
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
return Optional.of(jobId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
} else {
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getPipelineJobClass().getName());
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getJobClass().getName());
JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
Expand Down Expand Up @@ -329,7 +329,7 @@ public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final
}

@Override
public Class<CDCJob> getPipelineJobClass() {
public Class<CDCJob> getJobClass() {
return CDCJob.class;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ public YamlConsistencyCheckJobItemProgressSwapper getYamlJobItemProgressSwapper(
}

@Override
public Class<ConsistencyCheckJob> getPipelineJobClass() {
public Class<ConsistencyCheckJob> getJobClass() {
return ConsistencyCheckJob.class;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public void refreshTableMetadata(final String jobId, final String databaseName)
}

@Override
public Class<MigrationJob> getPipelineJobClass() {
public Class<MigrationJob> getJobClass() {
return MigrationJob.class;
}

Expand Down

0 comments on commit 7b15f98

Please sign in to comment.