Skip to content

Commit

Permalink
Refactor TransmissionJobManager.showProcessConfiguration (#29262)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 2, 2023
1 parent 7206d6b commit 51834fc
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,20 @@ public void alterProcessConfiguration(final PipelineContextKey contextKey, final
}

/**
* Show process configuration.
* Show pipeline process configuration.
*
* @param jobId job id
* @return pipeline process configuration
*/
public PipelineProcessConfiguration showProcessConfiguration(final String jobId) {
return showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobId));
}

/**
* Show pipeline process configuration.
*
* @param contextKey context key
* @return process configuration, non-null
* @return pipeline process configuration
*/
public PipelineProcessConfiguration showProcessConfiguration(final PipelineContextKey contextKey) {
return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, jobOption.getType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void execute(final ShardingContext shardingContext) {

private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(jobConfig.getJobId());
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected void runBlocking() {
TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
try {
PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()));
PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(parentJobConfig.getJobId());
PipelineDataConsistencyChecker checker = jobOption.buildDataConsistencyChecker(
parentJobConfig, new TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
Expand Down Expand Up @@ -87,7 +86,7 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(jobConfig.getJobId());
TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ void assertDataConsistencyCheck() {
MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
initTableData(jobConfig);
jobManager.start(jobConfig);
PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(jobConfig.getJobId());
TransmissionProcessContext processContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig);
Map<String, TableDataConsistencyCheckResult> checkResultMap = jobOption.buildDataConsistencyChecker(
jobConfig, processContext, new ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, "H2")).check("FIXTURE", null);
Expand Down

0 comments on commit 51834fc

Please sign in to comment.