Skip to content

Commit

Permalink
Refactor CDCJobAPI (#29239)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 30, 2023
1 parent e0934fc commit 21da3a9
Showing 1 changed file with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,26 @@
@Slf4j
public final class CDCJobAPI implements TransmissionJobAPI {

private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
private final CDCJobOption jobOption;

private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
private final PipelineJobManager jobManager;

private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
private final PipelineJobConfigurationManager jobConfigManager;

private final CDCJobOption jobOption = new CDCJobOption();
private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper;

private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine;

private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper;

public CDCJobAPI() {
jobOption = new CDCJobOption();
jobManager = new PipelineJobManager(jobOption);
jobConfigManager = new PipelineJobConfigurationManager(jobOption);
dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper();
ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine();
pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();
}

/**
* Create CDC job.
Expand All @@ -108,7 +121,7 @@ public String create(final StreamDataParameter param, final CDCSinkType sinkType
log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId());
} else {
governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), jobOption.getJobClass());
JobConfigurationPOJO jobConfigPOJO = new PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig);
JobConfigurationPOJO jobConfigPOJO = jobConfigManager.convertToJobConfigurationPOJO(jobConfig);
jobConfigPOJO.setDisabled(true);
governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(), jobConfigPOJO);
if (!param.isFull()) {
Expand Down Expand Up @@ -231,9 +244,9 @@ public void disable(final String jobId) {
* @param jobId job id
*/
public void drop(final String jobId) {
CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
CDCJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId);
ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active"));
new PipelineJobManager(jobOption).drop(jobId);
jobManager.drop(jobId);
cleanup(jobConfig);
}

Expand Down

0 comments on commit 21da3a9

Please sign in to comment.