From 51834fc05ff37b5b21d0dcb3a831d46290f588f5 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sat, 2 Dec 2023 13:29:56 +0800 Subject: [PATCH] Refactor TransmissionJobManager.showProcessConfiguration (#29262) --- .../core/job/service/TransmissionJobManager.java | 14 ++++++++++++-- .../shardingsphere/data/pipeline/cdc/CDCJob.java | 2 +- .../task/ConsistencyCheckTasksRunner.java | 2 +- .../pipeline/scenario/migration/MigrationJob.java | 3 +-- .../migration/api/impl/MigrationJobAPITest.java | 2 +- 5 files changed, 16 insertions(+), 7 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java index 0bf66b41e5cd5..5107bf4869533 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java @@ -62,10 +62,20 @@ public void alterProcessConfiguration(final PipelineContextKey contextKey, final } /** - * Show process configuration. + * Show pipeline process configuration. + * + * @param jobId job id + * @return pipeline process configuration + */ + public PipelineProcessConfiguration showProcessConfiguration(final String jobId) { + return showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobId)); + } + + /** + * Show pipeline process configuration. * * @param contextKey context key - * @return process configuration, non-null + * @return pipeline process configuration */ public PipelineProcessConfiguration showProcessConfiguration(final PipelineContextKey contextKey) { return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, jobOption.getType())); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java index 9f1976a72ffdc..e98dd3c8a356b 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java @@ -132,7 +132,7 @@ public void execute(final ShardingContext shardingContext) { private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) { Optional initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem); - PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); + PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(jobConfig.getJobId()); TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index a93dcf5aed165..ed74f702eddde 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -108,7 +108,7 @@ protected void runBlocking() { TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption(); PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId); try { - PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId())); + PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(parentJobConfig.getJobId()); PipelineDataConsistencyChecker checker = jobOption.buildDataConsistencyChecker( parentJobConfig, new TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), jobItemContext.getProgressContext()); consistencyChecker.set(checker); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index 6dedfc50e04dd..0075da4b1e1a7 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -37,7 +37,6 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; @@ -87,7 +86,7 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC int shardingItem = shardingContext.getShardingItem(); MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); Optional initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem); - PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); + PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(jobConfig.getJobId()); TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager); diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 992f68154a4a6..86e40c727b09e 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -192,7 +192,7 @@ void assertDataConsistencyCheck() { MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); initTableData(jobConfig); jobManager.start(jobConfig); - PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); + PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(jobConfig.getJobId()); TransmissionProcessContext processContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); Map checkResultMap = jobOption.buildDataConsistencyChecker( jobConfig, processContext, new ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, "H2")).check("FIXTURE", null);