From 387dd7a7a5d03b8b64257c8ebf41c64e0a6f5cd5 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Thu, 30 Nov 2023 16:01:24 +0800 Subject: [PATCH] Refactor CDCJobAPI --- .../data/pipeline/cdc/api/CDCJobAPI.java | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java index 212a8cc82760a..80b8c0b9f9733 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java @@ -81,13 +81,26 @@ @Slf4j public final class CDCJobAPI implements TransmissionJobAPI { - private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); + private final CDCJobOption jobOption; - private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine(); + private final PipelineJobManager jobManager; - private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper(); + private final PipelineJobConfigurationManager jobConfigManager; - private final CDCJobOption jobOption = new CDCJobOption(); + private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper; + + private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine; + + private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper; + + public CDCJobAPI() { + jobOption = new CDCJobOption(); + jobManager = new PipelineJobManager(jobOption); + jobConfigManager = new PipelineJobConfigurationManager(jobOption); + dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); + ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine(); + pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper(); + } /** * Create CDC job. @@ -108,7 +121,7 @@ public String create(final StreamDataParameter param, final CDCSinkType sinkType log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId()); } else { governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), jobOption.getJobClass()); - JobConfigurationPOJO jobConfigPOJO = new PipelineJobConfigurationManager(jobOption).convertToJobConfigurationPOJO(jobConfig); + JobConfigurationPOJO jobConfigPOJO = jobConfigManager.convertToJobConfigurationPOJO(jobConfig); jobConfigPOJO.setDisabled(true); governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(), jobConfigPOJO); if (!param.isFull()) { @@ -231,9 +244,9 @@ public void disable(final String jobId) { * @param jobId job id */ public void drop(final String jobId) { - CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId); + CDCJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId); ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active")); - new PipelineJobManager(jobOption).drop(jobId); + jobManager.drop(jobId); cleanup(jobConfig); }