Skip to content

Commit

Permalink
Merge PipelineJobType and PipelineJobOption (#29270)
Browse files Browse the repository at this point in the history
* Merge PipelineJobType and PipelineJobOption

* Merge PipelineJobType and PipelineJobOption
  • Loading branch information
terrymanu authored Dec 3, 2023
1 parent 92d76f1 commit acd872e
Show file tree
Hide file tree
Showing 41 changed files with 376 additions and 513 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,21 @@

package org.apache.shardingsphere.data.pipeline.common.job.type;

import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;

import java.util.Optional;

/**
* Pipeline job type.
Expand All @@ -35,11 +47,82 @@ public interface PipelineJobType extends TypedSPI {
String getCode();

/**
* Get job option.
* Get YAML pipeline job configuration swapper.
*
* @param <T> type of YAML configuration
* @param <Y> type of pipeline job configuration
* @return YAML pipeline job configuration swapper
*/
<Y extends YamlConfiguration, T extends PipelineJobConfiguration> YamlPipelineJobConfigurationSwapper<Y, T> getYamlJobConfigurationSwapper();

/**
* Get YAML pipeline job item progress swapper.
*
* @param <T> type of pipeline job item progress
* @return YAML pipeline job item progress swapper
*/
<T extends PipelineJobItemProgress> YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T> getYamlJobItemProgressSwapper();

/**
* Get pipeline job class.
*
* @return pipeline job class
*/
Class<? extends PipelineJob> getJobClass();

/**
* Whether to ignore to start disabled job when job item progress is finished.
*
* @return ignore to start disabled job when job item progress is finished or not
*/
default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
return false;
}

/**
* Get to be start disabled next job type.
*
* @return to be start disabled next job type
*/
default Optional<String> getToBeStartDisabledNextJobType() {
return Optional.empty();
}

/**
* Get to be stopped previous job type.
*
* @return to be stopped previous job type
*/
default Optional<String> getToBeStoppedPreviousJobType() {
return Optional.empty();
}

/**
* Whether to force no sharding when convert to job configuration POJO.
*
* @return without sharding or not
*/
default boolean isForceNoShardingWhenConvertToJobConfigurationPOJO() {
return false;
}

/**
* Get pipeline job info.
*
* @param jobId job ID
* @return pipeline job info
*/
PipelineJobInfo getJobInfo(String jobId);

/**
* Build pipeline data consistency checker.
*
* @return job option
* @param jobConfig job configuration
* @param processContext process context
* @param progressContext consistency check job item progress context
* @return all logic tables check result
*/
PipelineJobOption getOption();
PipelineDataConsistencyChecker buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, TransmissionProcessContext processContext, ConsistencyCheckJobItemProgressContext progressContext);

@Override
String getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
Expand Down Expand Up @@ -56,7 +55,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {
private final String jobId;

@Getter(AccessLevel.PROTECTED)
private final PipelineJobOption jobOption;
private final PipelineJobType jobType;

private final AtomicBoolean stopping = new AtomicBoolean(false);

Expand All @@ -66,7 +65,7 @@ public abstract class AbstractPipelineJob implements PipelineJob {

protected AbstractPipelineJob(final String jobId) {
this.jobId = jobId;
jobOption = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getOption();
jobType = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void execute(final ShardingContext shardingContext) {
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
processFailed(new PipelineJobManager(getJobOption()), jobId, shardingItem, ex);
processFailed(new PipelineJobManager(getJobType()), jobId, shardingItem, ex);
throw ex;
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ private static synchronized void persist(final String jobId, final int shardingI
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getOption()
.getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobId).getType()).getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost {} ms", jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
Expand All @@ -35,7 +35,7 @@
@RequiredArgsConstructor
public final class PipelineJobConfigurationManager {

private final PipelineJobOption jobOption;
private final PipelineJobType jobType;

/**
* Get job configuration.
Expand All @@ -46,7 +46,7 @@ public final class PipelineJobConfigurationManager {
*/
@SuppressWarnings("unchecked")
public <T extends PipelineJobConfiguration> T getJobConfiguration(final String jobId) {
return (T) jobOption.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
return (T) jobType.getYamlJobConfigurationSwapper().swapToObject(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getJobParameter());
}

/**
Expand All @@ -58,8 +58,8 @@ public <T extends PipelineJobConfiguration> T getJobConfiguration(final String j
public JobConfigurationPOJO convertToJobConfigurationPOJO(final PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount());
result.setJobParameter(YamlEngine.marshal(jobOption.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
result.setShardingTotalCount(jobType.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 : jobConfig.getJobShardingCount());
result.setJobParameter(YamlEngine.marshal(jobType.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
String createTimeFormat = LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter());
result.getProps().setProperty("create_time", createTimeFormat);
result.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
Expand Down
Loading

0 comments on commit acd872e

Please sign in to comment.