From 77b8bdd6ac1e44343f0da87d6d6b31ecbb3f2780 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Fri, 17 Nov 2023 23:08:32 +0800 Subject: [PATCH] Add YamlPipelineJobConfigurationSwapper (#29060) --- .../core/job/service/PipelineJobAPI.java | 12 +++--- .../core/job/service/PipelineJobManager.java | 10 +++++ ...bstractInventoryIncrementalJobAPIImpl.java | 4 +- .../YamlPipelineJobConfigurationSwapper.java | 39 +++++++++++++++++++ .../update/CheckMigrationJobUpdater.java | 3 +- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 8 ++-- .../cdc/handler/CDCBackendHandler.java | 7 +++- .../YamlCDCJobConfigurationSwapper.java | 13 ++----- .../api/impl/ConsistencyCheckJobAPI.java | 7 ++-- ...nsistencyCheckJobConfigurationSwapper.java | 11 ++---- .../task/ConsistencyCheckTasksRunner.java | 2 +- .../migration/api/impl/MigrationJobAPI.java | 10 ++--- .../YamlMigrationJobConfigurationSwapper.java | 13 ++----- .../api/impl/ConsistencyCheckJobAPITest.java | 4 +- .../api/impl/MigrationJobAPITest.java | 6 +-- 15 files changed, 94 insertions(+), 55 deletions(-) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobConfigurationSwapper.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index 4ac59eefcf944..6a4d6a5fcc4ca 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -17,12 +17,11 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; -import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; -import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper; import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; @@ -35,12 +34,11 @@ public interface PipelineJobAPI extends TypedSPI { /** - * Get job configuration. - * - * @param jobConfigPOJO job configuration POJO - * @return pipeline job configuration + * Get YAML job configuration swapper. + * + * @return YAML job configuration swapper */ - PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO jobConfigPOJO); + YamlPipelineJobConfigurationSwapper getYamlJobConfigurationSwapper(); /** * Whether to ignore to start disabled job when job item progress is finished. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index 47860f1400c53..be7c8121cb3d4 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -57,6 +57,16 @@ public final class PipelineJobManager { private final PipelineJobAPI pipelineJobAPI; + /** + * Get job configuration. + * + * @param jobConfigPOJO job configuration POJO + * @return pipeline job configuration + */ + public PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { + return pipelineJobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); + } + /** * Start job. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java index bfc7e8e9fd416..7462f12669e1b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java @@ -102,12 +102,12 @@ public Map getJobProgress(final Pi @Override public List getJobItemInfos(final String jobId) { + PipelineJobManager pipelineJobManager = new PipelineJobManager(this); JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); - PipelineJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO); + PipelineJobConfiguration jobConfig = pipelineJobManager.getJobConfiguration(jobConfigPOJO); long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0")); Map jobProgress = getJobProgress(jobConfig); List result = new LinkedList<>(); - PipelineJobManager pipelineJobManager = new PipelineJobManager(this); for (Entry entry : jobProgress.entrySet()) { int shardingItem = entry.getKey(); TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) getJobInfo(jobId); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobConfigurationSwapper.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobConfigurationSwapper.java new file mode 100644 index 0000000000000..4482fb831fd91 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobConfigurationSwapper.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.job.yaml; + +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; +import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper; + +/** + * YAML pipeline job configuration swapper. + * + * @param type of YAML configuration + * @param type of swapped pipeline job configuration + */ +public interface YamlPipelineJobConfigurationSwapper extends YamlConfigurationSwapper { + + /** + * Swap to job configuration from text. + * + * @param jobParam job parameter + * @return job configuration + */ + T swapToObject(String jobParam); +} diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java index 547d6255f4b9f..0789b0519b0ac 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java @@ -20,6 +20,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; @@ -47,7 +48,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName(); Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps(); String jobId = sqlStatement.getJobId(); - MigrationJobConfiguration jobConfig = migrationJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); verifyInventoryFinished(jobConfig); checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType())); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 4d6d31a3cfbee..dc0bda5338011 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -279,15 +279,15 @@ public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfigurat } @Override - public CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { - return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); + public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() { + return new YamlCDCJobConfigurationSwapper(); } @Override public TableBasedPipelineJobInfo getJobInfo(final String jobId) { JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO); - CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO); + CDCJobConfiguration jobConfig = (CDCJobConfiguration) new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO); return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames())); } @@ -302,7 +302,7 @@ public void commit(final String jobId) { */ public void dropStreaming(final String jobId) { JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); - CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO); + CDCJobConfiguration jobConfig = (CDCJobConfiguration) new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO); ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active")); new PipelineJobManager(this).drop(jobId); cleanup(jobConfig); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java index 2ef78db661ef0..bc041d9db8133 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java @@ -48,6 +48,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData; import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType; @@ -72,6 +73,8 @@ public final class CDCBackendHandler { private final CDCJobAPI jobAPI = new CDCJobAPI(); + private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI); + /** * Get database name by job ID. * @@ -79,7 +82,7 @@ public final class CDCBackendHandler { * @return database */ public String getDatabaseNameByJobId(final String jobId) { - return jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName(); + return ((CDCJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId))).getDatabaseName(); } /** @@ -127,7 +130,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod * @param connectionContext connection context */ public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) { - CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId)); if (PipelineJobCenter.isJobExisting(jobId)) { PipelineJobCenter.stop(jobId); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java index 871ed0eb7eb1d..34ba337f69d4a 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/swapper/YamlCDCJobConfigurationSwapper.java @@ -18,17 +18,17 @@ package org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper; import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration; -import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration.SinkConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType; import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration; import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; +import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper; import java.util.Collections; import java.util.List; @@ -37,7 +37,7 @@ /** * YAML CDC job configuration swapper. */ -public final class YamlCDCJobConfigurationSwapper implements YamlConfigurationSwapper { +public final class YamlCDCJobConfigurationSwapper implements YamlPipelineJobConfigurationSwapper { private final YamlPipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper(); @@ -81,12 +81,7 @@ public CDCJobConfiguration swapToObject(final YamlCDCJobConfiguration yamlConfig jobShardingDataNodes, yamlConfig.isDecodeWithTX(), sinkConfig, yamlConfig.getConcurrency(), yamlConfig.getRetryTimes()); } - /** - * Swap to job configuration from text. - * - * @param jobParam job parameter - * @return job configuration - */ + @Override public CDCJobConfiguration swapToObject(final String jobParam) { return null == jobParam ? null : swapToObject(YamlEngine.unmarshal(jobParam, YamlCDCJobConfiguration.class, true)); } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 88e39ee70d309..e96f1b2e4f304 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -312,7 +312,8 @@ private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result } private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) { - ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); + ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) new PipelineJobManager(this) + .getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); result.setAlgorithmType(jobConfig.getAlgorithmTypeName()); if (null != jobConfig.getAlgorithmProps()) { result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(","))); @@ -330,8 +331,8 @@ private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo } @Override - public ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { - return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); + public YamlConsistencyCheckJobConfigurationSwapper getYamlJobConfigurationSwapper() { + return new YamlConsistencyCheckJobConfigurationSwapper(); } @Override diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfigurationSwapper.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfigurationSwapper.java index 5431c935f213a..f18ce3f59036f 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfigurationSwapper.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/config/yaml/YamlConsistencyCheckJobConfigurationSwapper.java @@ -17,16 +17,16 @@ package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper; /** * YAML consistency check job configuration swapper. */ -public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlConfigurationSwapper { +public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlPipelineJobConfigurationSwapper { @Override public YamlConsistencyCheckJobConfiguration swapToYamlConfiguration(final ConsistencyCheckJobConfiguration data) { @@ -45,12 +45,7 @@ public ConsistencyCheckJobConfiguration swapToObject(final YamlConsistencyCheckJ return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(), yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(), yamlConfig.getAlgorithmProps(), databaseType); } - /** - * Swap to job configuration from text. - * - * @param jobParam job parameter - * @return job configuration - */ + @Override public ConsistencyCheckJobConfiguration swapToObject(final String jobParam) { return null == jobParam ? null : swapToObject(YamlEngine.unmarshal(jobParam, YamlConsistencyCheckJobConfiguration.class, true)); } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 9c8b1f0f9491c..43f826dc52cd7 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -98,7 +98,7 @@ protected void runBlocking() { checkJobAPI.persistJobItemProgress(jobItemContext); JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId); InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType()); - PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId)); + PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId)); try { PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker( parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext()); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 52527c619011f..1f2378d05d3bb 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -213,7 +213,7 @@ public TableBasedPipelineJobInfo getJobInfo(final String jobId) { JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO); List sourceTables = new LinkedList<>(); - getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes() + ((MigrationJobConfiguration) new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO)).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes() .forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); return new TableBasedPipelineJobInfo(jobMetaData, String.join(",", sourceTables)); } @@ -227,8 +227,8 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina } @Override - public MigrationJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { - return new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); + public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() { + return new YamlMigrationJobConfigurationSwapper(); } @Override @@ -323,7 +323,7 @@ private void dropCheckJobs(final String jobId) { } private void cleanTempTableOnRollback(final String jobId) throws SQLException { - MigrationJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType()); TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap()); try ( @@ -347,7 +347,7 @@ public void commit(final String jobId) { PipelineJobManager jobManager = new PipelineJobManager(this); jobManager.stop(jobId); dropCheckJobs(jobId); - MigrationJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName()); jobManager.drop(jobId); log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java index c5dce6c130669..e7e123319b2b0 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/yaml/job/YamlMigrationJobConfigurationSwapper.java @@ -17,13 +17,13 @@ package org.apache.shardingsphere.data.pipeline.yaml.job; -import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; +import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper; import java.util.LinkedHashMap; import java.util.Map.Entry; @@ -32,7 +32,7 @@ /** * YAML migration job configuration swapper. */ -public final class YamlMigrationJobConfigurationSwapper implements YamlConfigurationSwapper { +public final class YamlMigrationJobConfigurationSwapper implements YamlPipelineJobConfigurationSwapper { private final YamlPipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper(); @@ -67,12 +67,7 @@ public MigrationJobConfiguration swapToObject(final YamlMigrationJobConfiguratio yamlConfig.getConcurrency(), yamlConfig.getRetryTimes()); } - /** - * Swap to migration job configuration from YAML text. - * - * @param jobParam job parameter YAML text - * @return migration job configuration - */ + @Override public MigrationJobConfiguration swapToObject(final String jobParam) { return swapToObject(YamlEngine.unmarshal(jobParam, YamlMigrationJobConfiguration.class, true)); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java index 169f76ab6d52d..6ab0ef41af097 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java @@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter; @@ -64,7 +65,8 @@ void assertCreateJobConfig() { String parentJobId = parentJobConfig.getJobId(); String checkJobId = checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(parentJobId, null, null, parentJobConfig.getSourceDatabaseType(), parentJobConfig.getTargetDatabaseType())); - ConsistencyCheckJobConfiguration checkJobConfig = checkJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); + ConsistencyCheckJobConfiguration checkJobConfig = (ConsistencyCheckJobConfiguration) new PipelineJobManager(checkJobAPI) + .getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId)); int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE; String expectCheckJobId = new ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), parentJobId, expectedSequence).marshal(); assertThat(checkJobConfig.getJobId(), is(expectCheckJobId)); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index bbd98ed6fbd4d..472a14550de05 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -142,7 +142,7 @@ void assertStartOrStopById() { void assertRollback() throws SQLException { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); + MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); initTableData(jobConfig); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); @@ -154,7 +154,7 @@ void assertRollback() throws SQLException { void assertCommit() { Optional jobId = jobManager.start(JobConfigurationBuilder.createJobConfiguration()); assertTrue(jobId.isPresent()); - MigrationJobConfiguration jobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); + MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get())); initTableData(jobConfig); PipelineDistributedBarrier mockBarrier = mock(PipelineDistributedBarrier.class); when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); @@ -277,7 +277,7 @@ void assertCreateJobConfig() throws SQLException { initIntPrimaryEnvironment(); SourceTargetEntry sourceTargetEntry = new SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order"); String jobId = jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), "logic_db")); - MigrationJobConfiguration actual = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); + MigrationJobConfiguration actual = (MigrationJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); assertThat(actual.getTargetDatabaseName(), is("logic_db")); List dataNodeLines = actual.getJobShardingDataNodes(); assertThat(dataNodeLines.size(), is(1));