From 309d6f31df767394d0a21d2f4dc4fd085e885236 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Fri, 24 Nov 2023 18:37:31 +0800 Subject: [PATCH] Add PipelineJobConfigurationManager (#29198) * Add PipelineJobConfigurationLoader * Refactor PipelineJobConfigurationLoader * Add PipelineJobConfigurationLoader * Add PipelineJobConfigurationManager --- .../config/job/PipelineJobConfiguration.java | 35 --------- .../core/job/service/PipelineJobAPI.java | 15 +++- .../PipelineJobConfigurationManager.java | 72 +++++++++++++++++++ .../core/job/service/PipelineJobManager.java | 14 +--- .../job/service/TransmissionJobManager.java | 2 +- .../update/CheckMigrationJobUpdater.java | 4 +- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 26 ++++--- .../cdc/config/job/CDCJobConfiguration.java | 15 ---- .../cdc/handler/CDCBackendHandler.java | 8 +-- .../api/impl/ConsistencyCheckJobAPI.java | 4 +- .../ConsistencyCheckJobConfiguration.java | 7 -- .../task/ConsistencyCheckTasksRunner.java | 3 +- .../migration/api/impl/MigrationJobAPI.java | 8 ++- .../config/MigrationJobConfiguration.java | 7 -- .../api/impl/ConsistencyCheckJobAPITest.java | 4 +- .../api/impl/MigrationJobAPITest.java | 10 ++- 16 files changed, 131 insertions(+), 103 deletions(-) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java index 511c10d2fb9b0..e6f6516b7f482 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/job/PipelineJobConfiguration.java @@ -17,23 +17,13 @@ package org.apache.shardingsphere.data.pipeline.common.config.job; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener; -import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.infra.util.yaml.YamlEngine; - -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Collections; /** * Pipeline job configuration. */ public interface PipelineJobConfiguration { - DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - /** * Get job id. * @@ -54,29 +44,4 @@ public interface PipelineJobConfiguration { * @return source database type */ DatabaseType getSourceDatabaseType(); - - /** - * Convert to job configuration POJO. - * - * @return converted job configuration POJO - */ - default JobConfigurationPOJO convertToJobConfigurationPOJO() { - JobConfigurationPOJO result = new JobConfigurationPOJO(); - result.setJobName(getJobId()); - result.setShardingTotalCount(getJobShardingCount()); - result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration())); - String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER); - result.getProps().setProperty("create_time", createTimeFormat); - result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); - result.getProps().setProperty("run_count", "1"); - result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName())); - return result; - } - - /** - * Swap to YAML pipeline job configuration. - * - * @return swapped YAML pipeline job configuration - */ - YamlPipelineJobConfiguration swapToYamlJobConfiguration(); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index 87ae49520dcc0..5e4c86b125f52 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper; @@ -24,6 +25,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; +import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; import java.util.Optional; @@ -36,9 +38,11 @@ public interface PipelineJobAPI extends TypedSPI { /** * Get YAML pipeline job configuration swapper. * + * @param type of YAML configuration + * @param type of pipeline job configuration * @return YAML pipeline job configuration swapper */ - YamlPipelineJobConfigurationSwapper getYamlJobConfigurationSwapper(); + YamlPipelineJobConfigurationSwapper getYamlJobConfigurationSwapper(); /** * Get YAML pipeline job item progress swapper. @@ -75,6 +79,15 @@ default Optional getToBeStoppedPreviousJobType() { return Optional.empty(); } + /** + * Whether to force no sharding when convert to job configuration POJO. + * + * @return without sharding or not + */ + default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() { + return false; + } + /** * Get pipeline job class. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java new file mode 100644 index 0000000000000..3e5d357766058 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.job.service; + +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; +import org.apache.shardingsphere.infra.util.yaml.YamlEngine; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collections; + +/** + * Pipeline job configuration manager. + */ +@RequiredArgsConstructor +public final class PipelineJobConfigurationManager { + + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + private final PipelineJobAPI jobAPI; + + /** + * Get job configuration. + * + * @param jobId job ID + * @param type of pipeline job configuration + * @return pipeline job configuration + */ + @SuppressWarnings("unchecked") + public T getJobConfiguration(final String jobId) { + return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter()); + } + + /** + * Convert to job configuration POJO. + * + * @param jobConfig pipeline job configuration + * @return converted job configuration POJO + */ + public JobConfigurationPOJO convertToJobConfigurationPOJO(final PipelineJobConfiguration jobConfig) { + JobConfigurationPOJO result = new JobConfigurationPOJO(); + result.setJobName(jobConfig.getJobId()); + int shardingTotalCount = jobAPI.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount(); + result.setShardingTotalCount(shardingTotalCount); + result.setJobParameter(YamlEngine.marshal(jobAPI.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig))); + String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER); + result.getProps().setProperty("create_time", createTimeFormat); + result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); + result.getProps().setProperty("run_count", "1"); + result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName())); + return result; + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index b5414e71b5411..80890cc2d46d4 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -53,18 +53,6 @@ public final class PipelineJobManager { private final PipelineJobAPI jobAPI; - /** - * Get job configuration. - * - * @param jobId job ID - * @param type of pipeline job configuration - * @return pipeline job configuration - */ - @SuppressWarnings("unchecked") - public T getJobConfiguration(final String jobId) { - return (T) jobAPI.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter()); - } - /** * Start job. * @@ -80,7 +68,7 @@ public Optional start(final PipelineJobConfiguration jobConfig) { return Optional.of(jobId); } governanceFacade.getJobFacade().getJob().create(jobId, jobAPI.getJobClass()); - governanceFacade.getJobFacade().getConfiguration().persist(jobId, jobConfig.convertToJobConfigurationPOJO()); + governanceFacade.getJobFacade().getConfiguration().persist(jobId, new PipelineJobConfigurationManager(jobAPI).convertToJobConfigurationPOJO(jobConfig)); return Optional.of(jobId); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java index cc3f9c464438f..e6b8e9803796c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java @@ -76,7 +76,7 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte * @return job item infos */ public List getJobItemInfos(final String jobId) { - PipelineJobConfiguration jobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(jobId); + PipelineJobConfiguration jobConfig = new PipelineJobConfigurationManager(jobAPI).getJobConfiguration(jobId); long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0")); Map jobProgress = getJobProgress(jobConfig); List result = new LinkedList<>(); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java index 6f72afb054f5a..dfccb6f94053e 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java @@ -19,8 +19,8 @@ import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; @@ -48,7 +48,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName(); Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps(); String jobId = sqlStatement.getJobId(); - MigrationJobConfiguration jobConfig = new PipelineJobManager(migrationJobAPI).getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(migrationJobAPI).getJobConfiguration(jobId); verifyInventoryFinished(jobConfig); checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType())); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 839f61bf204cb..416d4ee4204ba 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -36,9 +36,9 @@ import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager; +import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils; @@ -46,8 +46,8 @@ import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager; import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper; -import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; @@ -66,11 +66,12 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap; @@ -85,6 +86,7 @@ import java.sql.SQLException; import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -100,6 +102,8 @@ @Slf4j public final class CDCJobAPI implements TransmissionJobAPI { + private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine(); @@ -125,7 +129,7 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT log.warn("CDC job already exists in registry center, ignore, job id is `{}`", jobConfig.getJobId()); } else { governanceFacade.getJobFacade().getJob().create(jobConfig.getJobId(), getJobClass()); - JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO(); + JobConfigurationPOJO jobConfigPOJO = new PipelineJobConfigurationManager(this).convertToJobConfigurationPOJO(jobConfig); jobConfigPOJO.setDisabled(true); governanceFacade.getJobFacade().getConfiguration().persist(jobConfig.getJobId(), jobConfigPOJO); if (!param.isFull()) { @@ -222,7 +226,7 @@ public void updateJobConfigurationDisabled(final String jobId, final boolean dis JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); jobConfigPOJO.setDisabled(disabled); if (disabled) { - jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(PipelineJobConfiguration.DATE_TIME_FORMATTER)); + jobConfigPOJO.getProps().setProperty("stop_time", LocalDateTime.now().format(DATE_TIME_FORMATTER)); jobConfigPOJO.getProps().setProperty("stop_time_millis", String.valueOf(System.currentTimeMillis())); } else { jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); @@ -282,6 +286,7 @@ public CDCProcessContext buildProcessContext(final PipelineJobConfiguration jobC return new CDCProcessContext(jobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()))); } + @SuppressWarnings("unchecked") @Override public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() { return new YamlCDCJobConfigurationSwapper(); @@ -290,10 +295,15 @@ public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() { @Override public PipelineJobInfo getJobInfo(final String jobId) { PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); - CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId); + CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId); return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames())); } + @Override + public boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() { + return true; + } + @Override public void commit(final String jobId) { } @@ -304,7 +314,7 @@ public void commit(final String jobId) { * @param jobId job id */ public void dropStreaming(final String jobId) { - CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId); + CDCJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId); ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active")); new PipelineJobManager(this).drop(jobId); cleanup(jobConfig); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java index c15c38e62432a..8e0aa41737fd6 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java @@ -21,11 +21,8 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; -import org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; -import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.List; @@ -67,18 +64,6 @@ public int getJobShardingCount() { return jobShardingDataNodes.size(); } - @Override - public JobConfigurationPOJO convertToJobConfigurationPOJO() { - JobConfigurationPOJO result = PipelineJobConfiguration.super.convertToJobConfigurationPOJO(); - result.setShardingTotalCount(1); - return result; - } - - @Override - public YamlPipelineJobConfiguration swapToYamlJobConfiguration() { - return new YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(this); - } - @RequiredArgsConstructor @Getter public static class SinkConfiguration { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index 05070a7a901ef..cc0faf0536cc7 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -47,7 +47,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType; @@ -72,7 +72,7 @@ public final class CDCBackendHandler { private final CDCJobAPI jobAPI = new CDCJobAPI(); - private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI); + private final PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(jobAPI); /** * Get database name by job ID. @@ -81,7 +81,7 @@ public final class CDCBackendHandler { * @return database */ public String getDatabaseNameByJobId(final String jobId) { - return jobManager.getJobConfiguration(jobId).getDatabaseName(); + return jobConfigManager.getJobConfiguration(jobId).getDatabaseName(); } /** @@ -129,7 +129,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod * @param connectionContext connection context */ public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) { - CDCJobConfiguration cdcJobConfig = jobManager.getJobConfiguration(jobId); + CDCJobConfiguration cdcJobConfig = jobConfigManager.getJobConfiguration(jobId); ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId)); if (PipelineJobCenter.isJobExisting(jobId)) { PipelineJobCenter.stop(jobId); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 367b7e19b1774..c02766b6033b7 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob; @@ -265,13 +266,14 @@ private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result } private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) { - ConsistencyCheckJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(checkJobId); + ConsistencyCheckJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(checkJobId); result.setAlgorithmType(jobConfig.getAlgorithmTypeName()); if (null != jobConfig.getAlgorithmProps()) { result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(","))); } } + @SuppressWarnings("unchecked") @Override public YamlConsistencyCheckJobConfigurationSwapper getYamlJobConfigurationSwapper() { return new YamlConsistencyCheckJobConfigurationSwapper(); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java index 578eeeed0e03d..04d647d59715f 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/ConsistencyCheckJobConfiguration.java @@ -21,8 +21,6 @@ import lombok.RequiredArgsConstructor; import lombok.ToString; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.Properties; @@ -49,9 +47,4 @@ public final class ConsistencyCheckJobConfiguration implements PipelineJobConfig public int getJobShardingCount() { return 1; } - - @Override - public YamlPipelineJobConfiguration swapToYamlJobConfiguration() { - return new YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration(this); - } } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 291d724dfdbf8..291ce0108df4b 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -30,6 +30,7 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; @@ -103,7 +104,7 @@ protected void runBlocking() { jobItemManager.persistProgress(jobItemContext); JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); TransmissionJobAPI jobAPI = (TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType()); - PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(parentJobId); + PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobAPI).getJobConfiguration(parentJobId); try { PipelineDataConsistencyChecker checker = jobAPI.buildDataConsistencyChecker( parentJobConfig, jobAPI.buildProcessContext(parentJobConfig), jobItemContext.getProgressContext()); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 331211fdf2d85..8b1918c218a1a 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -53,6 +53,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; @@ -212,7 +213,7 @@ private Map buildTargetTableSchemaMap(final Map sourceTables = new LinkedList<>(); - new PipelineJobManager(this).getJobConfiguration(jobId).getJobShardingDataNodes() + new PipelineJobConfigurationManager(this).getJobConfiguration(jobId).getJobShardingDataNodes() .forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables)); } @@ -225,6 +226,7 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina } } + @SuppressWarnings("unchecked") @Override public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() { return new YamlMigrationJobConfigurationSwapper(); @@ -322,7 +324,7 @@ private void dropCheckJobs(final String jobId) { } private void cleanTempTableOnRollback(final String jobId) throws SQLException { - MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); try ( @@ -346,7 +348,7 @@ public void commit(final String jobId) { PipelineJobManager jobManager = new PipelineJobManager(this); jobManager.stop(jobId); dropCheckJobs(jobId); - MigrationJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId); + MigrationJobConfiguration jobConfig = new PipelineJobConfigurationManager(this).getJobConfiguration(jobId); refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName()); jobManager.drop(jobId); log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java index f45a30d512cb0..9ef3c648b07a0 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationJobConfiguration.java @@ -22,9 +22,7 @@ import lombok.ToString; import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; -import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import java.util.List; @@ -69,9 +67,4 @@ public final class MigrationJobConfiguration implements PipelineJobConfiguration public int getJobShardingCount() { return jobShardingDataNodes.size(); } - - @Override - public YamlPipelineJobConfiguration swapToYamlJobConfiguration() { - return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(this); - } } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index 1fde78da7f556..5409c4cd9e2be 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -23,8 +23,8 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter; @@ -69,7 +69,7 @@ void assertCreateJobConfig() { String parentJobId = parentJobConfig.getJobId(); String checkJobId = jobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); - ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(checkJobId); + ConsistencyCheckJobConfiguration checkJobConfig = new PipelineJobConfigurationManager(jobAPI).getJobConfiguration(checkJobId); int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE; String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal(); assertThat(checkJobConfig.getJobId(), is(expectCheckJobId)); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 13c3f46d689e8..e38bf513ad4a6 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -32,6 +32,7 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; @@ -90,6 +91,8 @@ class MigrationJobAPITest { private static MigrationJobAPI jobAPI; + private static PipelineJobConfigurationManager jobConfigManager; + private static PipelineJobManager jobManager; private static TransmissionJobManager transmissionJobManager; @@ -102,6 +105,7 @@ class MigrationJobAPITest { static void beforeClass() { PipelineContextUtils.mockModeConfigAndContextManager(); jobAPI = new MigrationJobAPI(); + jobConfigManager = new PipelineJobConfigurationManager(jobAPI); jobManager = new PipelineJobManager(jobAPI); transmissionJobManager = new TransmissionJobManager(jobAPI); jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); @@ -149,7 +153,7 @@ void assertStartOrStopById() { void assertRollback() throws SQLException { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(jobId.get()); + MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId.get()); initTableData(jobConfig); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); @@ -161,7 +165,7 @@ void assertRollback() throws SQLException { void assertCommit() { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobManager.getJobConfiguration(jobId.get()); + MigrationJobConfiguration jobConfig = jobConfigManager.getJobConfiguration(jobId.get()); initTableData(jobConfig); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); @@ -263,7 +267,7 @@ void assertCreateJobConfig() throws SQLException { initIntPrimaryEnvironment(); SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order"); String jobId = jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db")); - MigrationJobConfiguration actual = jobManager.getJobConfiguration(jobId); + MigrationJobConfiguration actual = jobConfigManager.getJobConfiguration(jobId); assertThat(actual.getTargetDatabaseName(), is("logic_db")); List dataNodeLines = actual.getJobShardingDataNodes(); assertThat(dataNodeLines.size(), is(1));