Skip to content

Commit

Permalink
Remove TransmissionJobOption.buildProcessContext (#29261)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 2, 2023
1 parent 34e8601 commit 7206d6b
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() {
*/
PipelineJobInfo getJobInfo(String jobId);

/**
* Build transmission process context.
*
* @param jobConfig pipeline job configuration
* @return transmission process context
*/
TransmissionProcessContext buildProcessContext(PipelineJobConfiguration jobConfig);

/**
* Build pipeline data consistency checker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
Expand Down Expand Up @@ -89,6 +90,8 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob {

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());

private final TransmissionJobManager transmissionJobManager = new TransmissionJobManager(jobOption);

private final CDCJobPreparer jobPreparer = new CDCJobPreparer();

private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
Expand Down Expand Up @@ -129,7 +132,8 @@ public void execute(final ShardingContext shardingContext) {

private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
TransmissionProcessContext jobProcessContext = jobOption.buildProcessContext(jobConfig);
PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
Expand All @@ -30,7 +29,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;

/**
* CDC job option.
Expand Down Expand Up @@ -61,12 +59,6 @@ public PipelineJobInfo getJobInfo(final String jobId) {
return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames()));
}

@Override
public TransmissionProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) {
PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
return new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
}

@Override
public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext,
final ConsistencyCheckJobItemProgressContext progressContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
Expand All @@ -30,11 +32,12 @@
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
Expand Down Expand Up @@ -105,7 +108,9 @@ protected void runBlocking() {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
try {
PipelineDataConsistencyChecker checker = jobOption.buildDataConsistencyChecker(parentJobConfig, jobOption.buildProcessContext(parentJobConfig), jobItemContext.getProgressContext());
PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()));
PipelineDataConsistencyChecker checker = jobOption.buildDataConsistencyChecker(
parentJobConfig, new TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Map<String, TableDataConsistencyCheckResult> checkResultMap = checker.check(checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps());
log.info("job {} with check algorithm '{}' data consistency checker result: {}, stopping: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
Expand Down Expand Up @@ -67,6 +69,8 @@ public final class MigrationJob extends AbstractSimplePipelineJob {

private final MigrationJobOption jobOption = new MigrationJobOption();

private final TransmissionJobManager transmissionJobManager = new TransmissionJobManager(jobOption);

private final PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());

private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
Expand All @@ -83,7 +87,8 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
TransmissionProcessContext jobProcessContext = jobOption.buildProcessContext(jobConfig);
PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
Expand All @@ -29,7 +28,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
Expand Down Expand Up @@ -74,12 +72,6 @@ public PipelineJobInfo getJobInfo(final String jobId) {
return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables));
}

@Override
public TransmissionProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) {
PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
return new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
}

@Override
public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext,
final ConsistencyCheckJobItemProgressContext progressContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
Expand Down Expand Up @@ -190,8 +192,10 @@ void assertDataConsistencyCheck() {
MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
initTableData(jobConfig);
jobManager.start(jobConfig);
PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
TransmissionProcessContext processContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
Map<String, TableDataConsistencyCheckResult> checkResultMap = jobOption.buildDataConsistencyChecker(
jobConfig, jobOption.buildProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, "H2")).check("FIXTURE", null);
jobConfig, processContext, new ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, "H2")).check("FIXTURE", null);
assertThat(checkResultMap.size(), is(1));
String checkKey = "t_order";
assertTrue(checkResultMap.get(checkKey).isMatched());
Expand Down

0 comments on commit 7206d6b

Please sign in to comment.