Skip to content

Commit

Permalink
Add YamlPipelineJobConfigurationSwapper (#29060)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 17, 2023
1 parent 3bb34d7 commit 77b8bdd
Show file tree
Hide file tree
Showing 15 changed files with 94 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

Expand All @@ -35,12 +34,11 @@
public interface PipelineJobAPI extends TypedSPI {

/**
* Get job configuration.
*
* @param jobConfigPOJO job configuration POJO
* @return pipeline job configuration
* Get YAML job configuration swapper.
*
* @return YAML job configuration swapper
*/
PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);
YamlPipelineJobConfigurationSwapper<?, ?> getYamlJobConfigurationSwapper();

/**
* Whether to ignore to start disabled job when job item progress is finished.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public final class PipelineJobManager {

private final PipelineJobAPI pipelineJobAPI;

/**
* Get job configuration.
*
* @param jobConfigPOJO job configuration POJO
* @return pipeline job configuration
*/
public PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return pipelineJobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}

/**
* Start job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ public Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(final Pi

@Override
public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String jobId) {
PipelineJobManager pipelineJobManager = new PipelineJobManager(this);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
PipelineJobConfiguration jobConfig = pipelineJobManager.getJobConfiguration(jobConfigPOJO);
long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
PipelineJobManager pipelineJobManager = new PipelineJobManager(this);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) getJobInfo(jobId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.core.job.yaml;

import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;

/**
* YAML pipeline job configuration swapper.
*
* @param <Y> type of YAML configuration
* @param <T> type of swapped pipeline job configuration
*/
public interface YamlPipelineJobConfigurationSwapper<Y extends YamlConfiguration, T extends PipelineJobConfiguration> extends YamlConfigurationSwapper<Y, T> {

/**
* Swap to job configuration from text.
*
* @param jobParam job parameter
* @return job configuration
*/
T swapToObject(String jobParam);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
Expand Down Expand Up @@ -47,7 +48,7 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme
String algorithmTypeName = null == typeStrategy ? null : typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null : typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
MigrationJobConfiguration jobConfig = migrationJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
verifyInventoryFinished(jobConfig);
checkJobAPI.createJobAndStart(new CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,15 +279,15 @@ public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfigurat
}

@Override
public CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() {
return new YamlCDCJobConfigurationSwapper();
}

@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
CDCJobConfiguration jobConfig = (CDCJobConfiguration) new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
}

Expand All @@ -302,7 +302,7 @@ public void commit(final String jobId) {
*/
public void dropStreaming(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
CDCJobConfiguration jobConfig = (CDCJobConfiguration) new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineInternalException("Can't drop streaming job which is active"));
new PipelineJobManager(this).drop(jobId);
cleanup(jobConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
Expand All @@ -72,14 +73,16 @@ public final class CDCBackendHandler {

private final CDCJobAPI jobAPI = new CDCJobAPI();

private final PipelineJobManager jobManager = new PipelineJobManager(jobAPI);

/**
* Get database name by job ID.
*
* @param jobId job ID
* @return database
*/
public String getDatabaseNameByJobId(final String jobId) {
return jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName();
return ((CDCJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId))).getDatabaseName();
}

/**
Expand Down Expand Up @@ -127,7 +130,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod
* @param connectionContext connection context
*/
public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration) jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId));
if (PipelineJobCenter.isJobExisting(jobId)) {
PipelineJobCenter.stop(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
package org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper;

import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration.SinkConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;

import java.util.Collections;
import java.util.List;
Expand All @@ -37,7 +37,7 @@
/**
* YAML CDC job configuration swapper.
*/
public final class YamlCDCJobConfigurationSwapper implements YamlConfigurationSwapper<YamlCDCJobConfiguration, CDCJobConfiguration> {
public final class YamlCDCJobConfigurationSwapper implements YamlPipelineJobConfigurationSwapper<YamlCDCJobConfiguration, CDCJobConfiguration> {

private final YamlPipelineDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper();

Expand Down Expand Up @@ -81,12 +81,7 @@ public CDCJobConfiguration swapToObject(final YamlCDCJobConfiguration yamlConfig
jobShardingDataNodes, yamlConfig.isDecodeWithTX(), sinkConfig, yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
}

/**
* Swap to job configuration from text.
*
* @param jobParam job parameter
* @return job configuration
*/
@Override
public CDCJobConfiguration swapToObject(final String jobParam) {
return null == jobParam ? null : swapToObject(YamlEngine.unmarshal(jobParam, YamlCDCJobConfiguration.class, true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ private void fillInJobItemInfoWithTimes(final ConsistencyCheckJobItemInfo result
}

private void fillInJobItemInfoWithCheckAlgorithm(final ConsistencyCheckJobItemInfo result, final String checkJobId) {
ConsistencyCheckJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
ConsistencyCheckJobConfiguration jobConfig = (ConsistencyCheckJobConfiguration) new PipelineJobManager(this)
.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
if (null != jobConfig.getAlgorithmProps()) {
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue())).collect(Collectors.joining(",")));
Expand All @@ -330,8 +331,8 @@ private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo
}

@Override
public ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
public YamlConsistencyCheckJobConfigurationSwapper getYamlJobConfigurationSwapper() {
return new YamlConsistencyCheckJobConfigurationSwapper();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml;

import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;

/**
* YAML consistency check job configuration swapper.
*/
public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlConfigurationSwapper<YamlConsistencyCheckJobConfiguration, ConsistencyCheckJobConfiguration> {
public final class YamlConsistencyCheckJobConfigurationSwapper implements YamlPipelineJobConfigurationSwapper<YamlConsistencyCheckJobConfiguration, ConsistencyCheckJobConfiguration> {

@Override
public YamlConsistencyCheckJobConfiguration swapToYamlConfiguration(final ConsistencyCheckJobConfiguration data) {
Expand All @@ -45,12 +45,7 @@ public ConsistencyCheckJobConfiguration swapToObject(final YamlConsistencyCheckJ
return new ConsistencyCheckJobConfiguration(yamlConfig.getJobId(), yamlConfig.getParentJobId(), yamlConfig.getAlgorithmTypeName(), yamlConfig.getAlgorithmProps(), databaseType);
}

/**
* Swap to job configuration from text.
*
* @param jobParam job parameter
* @return job configuration
*/
@Override
public ConsistencyCheckJobConfiguration swapToObject(final String jobParam) {
return null == jobParam ? null : swapToObject(YamlEngine.unmarshal(jobParam, YamlConsistencyCheckJobConfiguration.class, true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected void runBlocking() {
checkJobAPI.persistJobItemProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig = jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
try {
PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker(
parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobMetaData jobMetaData = new PipelineJobMetaData(jobConfigPOJO);
List<String> sourceTables = new LinkedList<>();
getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes()
((MigrationJobConfiguration) new PipelineJobManager(this).getJobConfiguration(jobConfigPOJO)).getJobShardingDataNodes().forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes()
.forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
return new TableBasedPipelineJobInfo(jobMetaData, String.join(",", sourceTables));
}
Expand All @@ -227,8 +227,8 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina
}

@Override
public MigrationJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) {
return new YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() {
return new YamlMigrationJobConfigurationSwapper();
}

@Override
Expand Down Expand Up @@ -323,7 +323,7 @@ private void dropCheckJobs(final String jobId) {
}

private void cleanTempTableOnRollback(final String jobId) throws SQLException {
MigrationJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
PipelineCommonSQLBuilder pipelineSQLBuilder = new PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
Expand All @@ -347,7 +347,7 @@ public void commit(final String jobId) {
PipelineJobManager jobManager = new PipelineJobManager(this);
jobManager.stop(jobId);
dropCheckJobs(jobId);
MigrationJobConfiguration jobConfig = getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() - startTimeMillis);
Expand Down
Loading

0 comments on commit 77b8bdd

Please sign in to comment.