Skip to content

Commit

Permalink
Add generic type on JobConfigurationChangedProcessor (#29382)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 12, 2023
1 parent 460349e commit 88f11c7
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
Expand All @@ -40,8 +41,11 @@ public final class JobConfigurationChangedProcessEngine {
* @param eventType event type
* @param jobConfig pipeline job configuration
* @param processor pipeline job configuration changed processor
* @param <T> type of pipeline job configuration
*/
public void process(final Type eventType, final JobConfiguration jobConfig, final JobConfigurationChangedProcessor processor) {
@SuppressWarnings("unchecked")
public <T extends PipelineJobConfiguration> void process(final Type eventType, final JobConfiguration jobConfig, final JobConfigurationChangedProcessor<T> processor) {
T pipelineJobConfig = (T) PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getYamlJobConfigurationSwapper().swapToObject(jobConfig.getJobParameter());
String jobId = jobConfig.getJobName();
if (jobConfig.isDisabled()) {
PipelineJobRegistry.stop(jobId);
Expand All @@ -54,7 +58,7 @@ public void process(final Type eventType, final JobConfiguration jobConfig, fina
if (PipelineJobRegistry.isExisting(jobId)) {
log.info("{} added to executing jobs failed since it already exists", jobId);
} else {
executeJob(jobConfig, processor);
executeJob(jobConfig, pipelineJobConfig, processor);
}
break;
case DELETED:
Expand All @@ -73,8 +77,8 @@ private void disableJob(final String jobId) {
}
}

private void executeJob(final JobConfiguration jobConfig, final JobConfigurationChangedProcessor processor) {
PipelineJob job = processor.createJob();
private <T extends PipelineJobConfiguration> void executeJob(final JobConfiguration jobConfig, final T pipelineJobConfig, final JobConfigurationChangedProcessor<T> processor) {
PipelineJob job = processor.createJob(pipelineJobConfig);
String jobId = jobConfig.getJobName();
PipelineJobRegistry.add(jobId, job);
OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(PipelineJobIdUtils.parseContextKey(jobId)), job, jobConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,32 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor;

import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;

/**
* Job configuration changed processor.
*
* @param <T> type of pipeline job configuration
*/
public interface JobConfigurationChangedProcessor extends TypedSPI {
public interface JobConfigurationChangedProcessor<T extends PipelineJobConfiguration> extends TypedSPI {

/**
* Create pipeline job.
*
*
* @param jobConfig pipeline job configuration
* @return pipeline job
*/
PipelineJob createJob();
PipelineJob createJob(T jobConfig);

/**
* Clean pipeline job.
*
* @param jobConfig pipeline job configuration
*/
void clean(JobConfiguration jobConfig);
default void clean(JobConfiguration jobConfig) {
}

@Override
String getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,18 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;

/**
* Consistency check job configuration changed processor.
*/
public final class ConsistencyCheckJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor {
public final class ConsistencyCheckJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor<ConsistencyCheckJobConfiguration> {

@Override
public PipelineJob createJob() {
public PipelineJob createJob(final ConsistencyCheckJobConfiguration jobConfig) {
return new ConsistencyCheckJob();
}

@Override
public void clean(final JobConfiguration jobConfig) {
}

@Override
public String getType() {
return "CONSISTENCY_CHECK";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.config.processor.JobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.swapper.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;

/**
* Migration job configuration changed processor.
*/
public final class MigrationJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor {
public final class MigrationJobConfigurationChangedProcessor implements JobConfigurationChangedProcessor<MigrationJobConfiguration> {

@Override
public PipelineJob createJob() {
public PipelineJob createJob(final MigrationJobConfiguration jobConfig) {
return new MigrationJob();
}

Expand Down

0 comments on commit 88f11c7

Please sign in to comment.