Skip to content

Commit

Permalink
Move AbstractPipelineJobAPIImpl.convertJobConfiguration and swapToYam…
Browse files Browse the repository at this point in the history
…lJobConfiguration to PipelineJobConfiguration (#29030)

* Use PipelineJobAPI.getPipelineJobClass() to instead of AbstractPipelineJobAPIImpl.getJobClassName()

* Move AbstractPipelineJobAPIImpl.convertJobConfiguration and swapToYamlJobConfiguration to PipelineJobConfiguration
  • Loading branch information
terrymanu authored Nov 13, 2023
1 parent b6d11fe commit 92dbc4b
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,23 @@

package org.apache.shardingsphere.data.pipeline.common.config.job;

import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;

/**
* Pipeline job configuration.
*/
public interface PipelineJobConfiguration {

DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

/**
* Get job id.
*
Expand All @@ -44,4 +54,29 @@ public interface PipelineJobConfiguration {
* @return source database type
*/
DatabaseType getSourceDatabaseType();

/**
* Convert to job configuration POJO.
*
* @return converted job configuration POJO
*/
default JobConfigurationPOJO convertToJobConfigurationPOJO() {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(getJobId());
result.setShardingTotalCount(getJobShardingCount());
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration()));
String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER);
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
result.getProps().setProperty("run_count", "1");
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
return result;
}

/**
* Swap to YAML pipeline job configuration.
*
* @return swapped YAML pipeline job configuration
*/
YamlPipelineJobConfiguration swapToYamlJobConfiguration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
Expand All @@ -38,7 +36,6 @@

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

Expand All @@ -61,25 +58,10 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
return Optional.of(jobId);
}
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId), getPipelineJobClass().getName());
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(convertJobConfiguration(jobConfig)));
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
return Optional.of(jobId);
}

protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
result.setShardingTotalCount(jobConfig.getJobShardingCount());
result.setJobParameter(YamlEngine.marshal(swapToYamlJobConfiguration(jobConfig)));
String createTimeFormat = LocalDateTime.now().format(DATE_TIME_FORMATTER);
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
result.getProps().setProperty("run_count", "1");
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
return result;
}

protected abstract YamlPipelineJobConfiguration swapToYamlJobConfiguration(PipelineJobConfiguration jobConfig);

protected abstract PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public String createJob(final StreamDataParameter param, final CDCSinkType sinkT
log.warn("CDC job already exists in registry center, ignore, jobConfigKey={}", jobConfigKey);
} else {
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()), getPipelineJobClass().getName());
JobConfigurationPOJO jobConfigPOJO = convertJobConfiguration(jobConfig);
JobConfigurationPOJO jobConfigPOJO = jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey, YamlEngine.marshal(jobConfigPOJO));
if (!param.isFull()) {
Expand Down Expand Up @@ -193,13 +193,6 @@ private static InventoryIncrementalJobItemProgress getInventoryIncrementalJobIte
return result;
}

@Override
protected JobConfigurationPOJO convertJobConfiguration(final PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = super.convertJobConfiguration(jobConfig);
result.setShardingTotalCount(1);
return result;
}

/**
* Start job.
*
Expand Down Expand Up @@ -294,11 +287,6 @@ protected CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO job
return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}

@Override
protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
return new YamlCDCJobConfigurationSwapper().swapToYamlConfiguration((CDCJobConfiguration) jobConfig);
}

@Override
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;

import java.util.List;
Expand Down Expand Up @@ -64,6 +67,18 @@ public int getJobShardingCount() {
return jobShardingDataNodes.size();
}

@Override
public JobConfigurationPOJO convertToJobConfigurationPOJO() {
JobConfigurationPOJO result = PipelineJobConfiguration.super.convertToJobConfigurationPOJO();
result.setShardingTotalCount(1);
return result;
}

@Override
public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
return new YamlCDCJobConfigurationSwapper().swapToYamlConfiguration(this);
}

@RequiredArgsConstructor
@Getter
public static class SinkConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,6 @@ protected ConsistencyCheckJobConfiguration getJobConfiguration(final JobConfigur
return new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}

@Override
protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
return new YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration((ConsistencyCheckJobConfiguration) jobConfig);
}

@Override
public void extendYamlJobConfiguration(final PipelineContextKey contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;

import java.util.Properties;
Expand All @@ -43,13 +45,13 @@ public final class ConsistencyCheckJobConfiguration implements PipelineJobConfig

private final DatabaseType sourceDatabaseType;

/**
* Get job sharding count.
*
* @return job sharding count
*/
@Override
public int getJobShardingCount() {
return 1;
}

@Override
public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
return new YamlConsistencyCheckJobConfigurationSwapper().swapToYamlConfiguration(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,6 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina
}
}

@Override
protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) {
return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration((MigrationJobConfiguration) jobConfig);
}

@Override
public MigrationJobConfiguration getJobConfiguration(final String jobId) {
return getJobConfiguration(getElasticJobConfigPOJO(jobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;

import java.util.List;
Expand Down Expand Up @@ -67,4 +69,9 @@ public final class MigrationJobConfiguration implements PipelineJobConfiguration
public int getJobShardingCount() {
return jobShardingDataNodes.size();
}

@Override
public YamlPipelineJobConfiguration swapToYamlJobConfiguration() {
return new YamlMigrationJobConfigurationSwapper().swapToYamlConfiguration(this);
}
}

0 comments on commit 92dbc4b

Please sign in to comment.