From e4c683c816002620d1f46556fbc880c8f44f492f Mon Sep 17 00:00:00 2001 From: zhangliang Date: Fri, 24 Nov 2023 15:54:56 +0800 Subject: [PATCH] Refactor TransmissionJobAPI --- .../core/job/service/TransmissionJobAPI.java | 19 +++++++++---------- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 12 ++++++------ .../data/pipeline/cdc/core/job/CDCJob.java | 2 +- .../task/ConsistencyCheckTasksRunner.java | 4 ++-- .../scenario/migration/MigrationJob.java | 2 +- .../migration/api/impl/MigrationJobAPI.java | 16 ++++++++-------- .../api/impl/MigrationJobAPITest.java | 4 ++-- 7 files changed, 29 insertions(+), 30 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java index 7a9431b794e29..8a1def38509fb 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java @@ -52,20 +52,20 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { /** * Build task configuration. * - * @param pipelineJobConfig pipeline job configuration + * @param jobConfig pipeline job configuration * @param jobShardingItem job sharding item - * @param pipelineProcessConfig pipeline process configuration + * @param processConfig pipeline process configuration * @return task configuration */ - PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration pipelineProcessConfig); + PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration jobConfig, int jobShardingItem, PipelineProcessConfiguration processConfig); /** - * Build pipeline process context. + * Build transmission process context. * - * @param pipelineJobConfig pipeline job configuration - * @return pipeline process context + * @param jobConfig pipeline job configuration + * @return transmission process context */ - TransmissionProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig); + TransmissionProcessContext buildProcessContext(PipelineJobConfiguration jobConfig); /** * Extend YAML job configuration. @@ -78,13 +78,12 @@ default YamlTransmissionJobItemProgressSwapper getYamlJobItemProgressSwapper() { /** * Build pipeline data consistency checker. * - * @param pipelineJobConfig job configuration + * @param jobConfig job configuration * @param processContext process context * @param progressContext consistency check job item progress context * @return all logic tables check result */ - PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, TransmissionProcessContext processContext, - ConsistencyCheckJobItemProgressContext progressContext); + PipelineDataConsistencyChecker buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, TransmissionProcessContext processContext, ConsistencyCheckJobItemProgressContext progressContext); /** * Commit pipeline job. diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 72d7179bf334c..839f61bf204cb 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -245,11 +245,11 @@ public void extendYamlJobConfiguration(final PipelineContextKey contextKey, fina } @Override - public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { + public CDCTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { CDCJobConfiguration jobConfig = (CDCJobConfiguration) pipelineJobConfig; TableAndSchemaNameMapper tableAndSchemaNameMapper = new TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()); IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, jobShardingItem, tableAndSchemaNameMapper); - ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, pipelineProcessConfig, jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper); + ImporterConfiguration importerConfig = buildImporterConfiguration(jobConfig, processConfig, jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper); CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, importerConfig); log.debug("buildTaskConfiguration, result={}", result); return result; @@ -277,9 +277,9 @@ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfigurati } @Override - public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) { + public CDCProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) { TransmissionJobManager jobManager = new TransmissionJobManager(this); - return new CDCProcessContext(pipelineJobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()))); + return new CDCProcessContext(jobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()))); } @Override @@ -325,8 +325,8 @@ public void rollback(final String jobId) throws SQLException { } @Override - public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final TransmissionProcessContext processContext, - final ConsistencyCheckJobItemProgressContext progressContext) { + public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext, + final ConsistencyCheckJobItemProgressContext progressContext) { throw new UnsupportedOperationException(); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java index 31cde63332dfe..78d0cd9e4b233 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java @@ -109,7 +109,7 @@ public void execute(final ShardingContext shardingContext) { private CDCJobItemContext buildPipelineJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) { Optional initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem); - CDCProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig); + CDCProcessContext jobProcessContext = jobAPI.buildProcessContext(jobConfig); CDCTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink); } diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index 58db6fc27c995..291d724dfdbf8 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -105,8 +105,8 @@ protected void runBlocking() { TransmissionJobAPI jobAPI = (TransmissionJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType()); PipelineJobConfiguration parentJobConfig = new PipelineJobManager(jobAPI).getJobConfiguration(parentJobId); try { - PipelineDataConsistencyChecker checker = jobAPI.buildPipelineDataConsistencyChecker( - parentJobConfig, jobAPI.buildPipelineProcessContext(parentJobConfig), jobItemContext.getProgressContext()); + PipelineDataConsistencyChecker checker = jobAPI.buildDataConsistencyChecker( + parentJobConfig, jobAPI.buildProcessContext(parentJobConfig), jobItemContext.getProgressContext()); consistencyChecker.set(checker); Map checkResultMap = checker.check(checkJobConfig.getAlgorithmTypeName(), checkJobConfig.getAlgorithmProps()); log.info("job {} with check algorithm '{}' data consistency checker result: {}, stopping: {}", diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index fd958c358956f..3e46a817d49bb 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -63,7 +63,7 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC int shardingItem = shardingContext.getShardingItem(); MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); Optional initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem); - MigrationProcessContext jobProcessContext = jobAPI.buildPipelineProcessContext(jobConfig); + MigrationProcessContext jobProcessContext = jobAPI.buildProcessContext(jobConfig); MigrationTaskConfiguration taskConfig = jobAPI.buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 6d9eb9054234e..331211fdf2d85 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -231,7 +231,7 @@ public YamlMigrationJobConfigurationSwapper getYamlJobConfigurationSwapper() { } @Override - public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { + public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration processConfig) { MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) pipelineJobConfig; IncrementalDumperContext incrementalDumperContext = new MigrationIncrementalDumperContextCreator( jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem)); @@ -240,7 +240,7 @@ public MigrationTaskConfiguration buildTaskConfiguration(final PipelineJobConfig Map> shardingColumnsMap = new ShardingColumnsExtractor().getShardingColumnsMap( ((ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames); ImporterConfiguration importerConfig = buildImporterConfiguration( - jobConfig, pipelineProcessConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); + jobConfig, processConfig, shardingColumnsMap, incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper()); MigrationTaskConfiguration result = new MigrationTaskConfiguration( incrementalDumperContext.getCommonContext().getDataSourceName(), createTableConfigs, incrementalDumperContext, importerConfig); log.info("buildTaskConfiguration, result={}", result); @@ -275,15 +275,15 @@ private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfi } @Override - public MigrationProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) { - PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())); - return new MigrationProcessContext(pipelineJobConfig.getJobId(), processConfig); + public MigrationProcessContext buildProcessContext(final PipelineJobConfiguration jobConfig) { + PipelineProcessConfiguration processConfig = new TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())); + return new MigrationProcessContext(jobConfig.getJobId(), processConfig); } @Override - public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final PipelineJobConfiguration pipelineJobConfig, final TransmissionProcessContext processContext, - final ConsistencyCheckJobItemProgressContext progressContext) { - return new MigrationDataConsistencyChecker((MigrationJobConfiguration) pipelineJobConfig, processContext, progressContext); + public PipelineDataConsistencyChecker buildDataConsistencyChecker(final PipelineJobConfiguration jobConfig, final TransmissionProcessContext processContext, + final ConsistencyCheckJobItemProgressContext progressContext) { + return new MigrationDataConsistencyChecker((MigrationJobConfiguration) jobConfig, processContext, progressContext); } @Override diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 7017e4e41e38a..13c3f46d689e8 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -184,8 +184,8 @@ void assertDataConsistencyCheck() { initTableData(jobConfig); Optional jobId = jobManager.start(jobConfig); assertTrue(jobId.isPresent()); - Map checkResultMap = jobAPI.buildPipelineDataConsistencyChecker( - jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", null); + Map checkResultMap = jobAPI.buildDataConsistencyChecker( + jobConfig, jobAPI.buildProcessContext(jobConfig), new ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", null); assertThat(checkResultMap.size(), is(1)); String checkKey = "t_order"; assertTrue(checkResultMap.get(checkKey).isMatched());