diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java index e6f6516b7f482..511c10d2fb9b0 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java @@ -17,13 +17,23 @@ package org.apache.shardingsphere.data.pipeline.common.config.job; +import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.util.yaml.YamlEngine; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collections; /** * Pipeline job configuration. */ public interface PipelineJobConfiguration { + DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + /** * Get job id. * @@ -44,4 +54,29 @@ public interface PipelineJobConfiguration { * @return source database type */ DatabaseType getSourceDatabaseType(); + + /** + * Convert to job configuration POJO. + * + * @return converted job configuration POJO + */ + default JobConfigurationPOJO convertToJobConfigurationPOJO() { + JobConfigurationPOJO result = new JobConfigurationPOJO(); + result.setJobName(getJobId()); + result.setShardingTotalCount(getJobShardingCount()); + result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration())); + String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER); + result.getProps().setProperty("create_time", createTimeFormat); + result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); + result.getProps().setProperty("run_count", "1"); + result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName())); + return result; + } + + /** + * Swap to YAML pipeline job configuration. + * + * @return swapped YAML pipeline job configuration + */ + YamlPipelineJobConfiguration swapToYamlJobConfiguration(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java index fa8a1f78e253e..f2ef4ea175419 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java @@ -20,9 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier; @@ -38,7 +36,6 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; -import java.util.Collections; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -61,25 +58,10 @@ public Optional start(final PipelineJobConfiguration jobConfig) { return Optional.of(jobId); } repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), getPipelineJobClass().getName()); - repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(convertJobConfiguration(jobConfig))); + repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO())); return Optional.of(jobId); } - protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfiguration jobConfig) { - JobConfigurationPOJO result = new JobConfigurationPOJO(); - result.setJobName(jobConfig.getJobId()); - result.setShardingTotalCount(jobConfig.getJobShardingCount()); - result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration(jobConfig))); - String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER); - result.getProps().setProperty("create_time", createTimeFormat); - result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); - result.getProps().setProperty("run_count", "1"); - result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName())); - return result; - } - - protected abstract YamlPipelineJobConfiguration swapToYamlJobConfiguration(PipelineJobConfiguration jobConfig); - protected abstract PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO jobConfigPOJO); @Override diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 9c99ae5f0967f..847987eb0b184 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -123,7 +123,7 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey); } else { repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getPipelineJobClass().getName()); - JobConfigurationPOJO jobConfigPOJO = convertJobConfiguration(jobConfig); + JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO(); jobConfigPOJO.setDisabled(true); repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO)); if (!param.isFull()) { @@ -193,13 +193,6 @@ private static InventoryIncrementalJobItemProgress getInventoryIncrementalJobIte return result; } - @Override - protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfiguration jobConfig) { - JobConfigurationPOJO result = super.convertJobConfiguration(jobConfig); - result.setShardingTotalCount(1); - return result; - } - /** * Start job. * @@ -294,11 +287,6 @@ protected CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO job return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); } - @Override - protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) { - return new YamlCDCJobConfigurationSwapper().swapToYamlConfiguration((CDCJobConfiguration) jobConfig); - } - @Override public TableBasedPipelineJobInfo getJobInfo(final String jobId) { JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java index 8e0aa41737fd6..c15c38e62432a 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java @@ -21,8 +21,11 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.List; @@ -64,6 +67,18 @@ public int getJobShardingCount() { return jobShardingDataNodes.size(); } + @Override + public JobConfigurationPOJO convertToJobConfigurationPOJO() { + JobConfigurationPOJO result = PipelineJobConfiguration.super.convertToJobConfigurationPOJO(); + result.setShardingTotalCount(1); + return result; + } + + @Override + public YamlPipelineJobConfiguration swapToYamlJobConfiguration() { + return new YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(this); + } + @RequiredArgsConstructor @Getter public static class SinkConfiguration { diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 58d4051ea0829..f267b56aced23 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -365,11 +365,6 @@ protected ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigur return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); } - @Override - protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) { - return new YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration((ConsistencyCheckJobConfiguration) jobConfig); - } - @Override public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) { } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java index a969417d96fc9..578eeeed0e03d 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java @@ -21,6 +21,8 @@ import lombok.RequiredArgsConstructor; import lombok.ToString; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.Properties; @@ -43,13 +45,13 @@ public final class ConsistencyCheckJobConfiguration implements PipelineJobConfig private final DatabaseType sourceDatabaseType; - /** - * Get job sharding count. - * - * @return job sharding count - */ @Override public int getJobShardingCount() { return 1; } + + @Override + public YamlPipelineJobConfiguration swapToYamlJobConfiguration() { + return new YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration(this); + } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 4a6a2b2c37cbc..ce2e5116377fa 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -226,11 +226,6 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina } } - @Override - protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) { - return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration((MigrationJobConfiguration) jobConfig); - } - @Override public MigrationJobConfiguration getJobConfiguration(final String jobId) { return getJobConfiguration(getElasticJobConfigPOJO(jobId)); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java index 9ef3c648b07a0..f45a30d512cb0 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java @@ -22,7 +22,9 @@ import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; +import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.List; @@ -67,4 +69,9 @@ public final class MigrationJobConfiguration implements PipelineJobConfiguration public int getJobShardingCount() { return jobShardingDataNodes.size(); } + + @Override + public YamlPipelineJobConfiguration swapToYamlJobConfiguration() { + return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(this); + } }