diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java index 1530dcb325bfb..c1982c23a3b28 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java @@ -19,17 +19,13 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; -import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import java.util.Collection; @@ -49,39 +45,6 @@ public final class TransmissionJobManager { private final TransmissionJobOption jobOption; - private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); - - /** - * Alter process configuration. - * - * @param contextKey context key - * @param processConfig process configuration - */ - public void alterProcessConfiguration(final PipelineContextKey contextKey, final PipelineProcessConfiguration processConfig) { - // TODO check rateLimiter type match or not - processConfigPersistService.persist(contextKey, jobOption.getType(), processConfig); - } - - /** - * Show pipeline process configuration. - * - * @param jobId job id - * @return pipeline process configuration - */ - public PipelineProcessConfiguration showProcessConfiguration(final String jobId) { - return showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobId)); - } - - /** - * Show pipeline process configuration. - * - * @param contextKey context key - * @return pipeline process configuration - */ - public PipelineProcessConfiguration showProcessConfiguration(final PipelineContextKey contextKey) { - return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, jobOption.getType())); - } - /** * Get job infos. * diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java index 843bb10a8e873..ac54ee21af827 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java @@ -19,32 +19,29 @@ import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; -import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.util.json.JsonUtils; import java.util.Arrays; import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; /** * Show streaming rule executor. */ public final class ShowStreamingRuleExecutor implements QueryableRALExecutor { + private final PipelineProcessConfigurationPersistService persistService = new PipelineProcessConfigurationPersistService(); + @Override public Collection getRows(final ShowStreamingRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "STREAMING").getOption()) - .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); - Collection result = new LinkedList<>(); - result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); - return result; + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(persistService.load(new PipelineContextKey(InstanceType.PROXY), "STREAMING")); + return Collections.singleton(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); } private String getString(final Object obj) { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java index e98dd3c8a356b..5da8e693eaa74 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java @@ -33,6 +33,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; @@ -58,7 +59,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; import org.apache.shardingsphere.elasticjob.api.ShardingContext; @@ -90,7 +91,7 @@ public final class CDCJob extends AbstractPipelineJob implements SimpleJob { private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); - private final TransmissionJobManager transmissionJobManager = new TransmissionJobManager(jobOption); + private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); private final CDCJobPreparer jobPreparer = new CDCJobPreparer(); @@ -132,7 +133,8 @@ public void execute(final ShardingContext shardingContext) { private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration jobConfig, final int shardingItem) { Optional initProgress = jobItemManager.getProgress(jobConfig.getJobId(), shardingItem); - PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(jobConfig.getJobId()); + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( + processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobOption.getType())); TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new CDCJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, sink); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java index ed74f702eddde..63d9849ce9218 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable; import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback; @@ -37,7 +38,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration; @@ -61,6 +62,8 @@ public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner { private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); + @Getter private final ConsistencyCheckJobItemContext jobItemContext; @@ -108,7 +111,8 @@ protected void runBlocking() { TransmissionJobOption jobOption = (TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption(); PipelineJobConfiguration parentJobConfig = new PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId); try { - PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(parentJobConfig.getJobId()); + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( + processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()), jobType.getType())); PipelineDataConsistencyChecker checker = jobOption.buildDataConsistencyChecker( parentJobConfig, new TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), jobItemContext.getProgressContext()); consistencyChecker.set(checker); diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java index 0075da4b1e1a7..c405924b26612 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java @@ -23,6 +23,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; @@ -37,8 +38,9 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner; import org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; @@ -68,10 +70,10 @@ public final class MigrationJob extends AbstractSimplePipelineJob { private final MigrationJobOption jobOption = new MigrationJobOption(); - private final TransmissionJobManager transmissionJobManager = new TransmissionJobManager(jobOption); - private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper()); + private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); + private final PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager(); // Shared by all sharding items @@ -86,7 +88,8 @@ protected TransmissionJobItemContext buildPipelineJobItemContext(final ShardingC int shardingItem = shardingContext.getShardingItem(); MigrationJobConfiguration jobConfig = new YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter()); Optional initProgress = jobItemManager.getProgress(shardingContext.getJobName(), shardingItem); - PipelineProcessConfiguration processConfig = transmissionJobManager.showProcessConfiguration(jobConfig.getJobId()); + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( + processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobOption.getType())); TransmissionProcessContext jobProcessContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); MigrationTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, shardingItem, jobProcessContext.getPipelineProcessConfig()); return new MigrationJobItemContext(jobConfig, shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java index 2e6cf74c7c127..630a682392ed8 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java @@ -18,33 +18,30 @@ package org.apache.shardingsphere.proxy.backend.handler.distsql.ral.queryable; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.distsql.statement.ral.queryable.ShowMigrationRuleStatement; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; -import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.util.json.JsonUtils; import java.util.Arrays; import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; /** * Show migration rule executor. */ public final class ShowMigrationRuleExecutor implements QueryableRALExecutor { + private final PipelineProcessConfigurationPersistService persistService = new PipelineProcessConfigurationPersistService(); + @Override public Collection getRows(final ShowMigrationRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption()) - .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); - Collection result = new LinkedList<>(); - result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); - return result; + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue(persistService.load(new PipelineContextKey(InstanceType.PROXY), "MIGRATION")); + return Collections.singleton(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); } private String getString(final Object obj) { diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java index 97c9b0dd3a245..fff58c7361c6a 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java @@ -20,8 +20,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType; -import org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption; -import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; +import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterTransmissionRuleStatement; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; @@ -33,11 +32,13 @@ */ public final class AlterTransmissionRuleUpdater implements RALUpdater { + private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); + @Override public void executeUpdate(final String databaseName, final AlterTransmissionRuleStatement sqlStatement) { - TransmissionJobManager jobManager = new TransmissionJobManager((TransmissionJobOption) TypedSPILoader.getService(PipelineJobType.class, sqlStatement.getJobTypeName()).getOption()); PipelineProcessConfiguration processConfig = TransmissionProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment()); - jobManager.alterProcessConfiguration(new PipelineContextKey(InstanceType.PROXY), processConfig); + String jobType = TypedSPILoader.getService(PipelineJobType.class, sqlStatement.getJobTypeName()).getOption().getType(); + processConfigPersistService.persist(new PipelineContextKey(InstanceType.PROXY), jobType, processConfig); } @Override diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 6add61f9fb2da..46f0812cdc16a 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -21,6 +21,7 @@ import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry; import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine; @@ -42,6 +43,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; +import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.MigrationJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; @@ -192,7 +194,8 @@ void assertDataConsistencyCheck() { MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); initTableData(jobConfig); jobManager.start(jobConfig); - PipelineProcessConfiguration processConfig = new TransmissionJobManager(jobOption).showProcessConfiguration(jobConfig.getJobId()); + PipelineProcessConfiguration processConfig = PipelineProcessConfigurationUtils.convertWithDefaultValue( + new PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()), jobOption.getType())); TransmissionProcessContext processContext = new TransmissionProcessContext(jobConfig.getJobId(), processConfig); Map checkResultMap = jobOption.buildDataConsistencyChecker( jobConfig, processContext, new ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, "H2")).check("FIXTURE", null);