From 2781579f9532dccfbfe531af4de7e2af34fb1d75 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sat, 18 Nov 2023 22:50:40 +0800 Subject: [PATCH] Add InventoryIncrementalJobManager --- .../service/InventoryIncrementalJobAPI.java | 62 ------- .../InventoryIncrementalJobManager.java | 164 ++++++++++++++++++ ...bstractInventoryIncrementalJobAPIImpl.java | 99 +---------- .../query/ShowStreamingRuleExecutor.java | 4 +- .../ShowMigrationCheckAlgorithmsExecutor.java | 6 +- .../update/CheckMigrationJobUpdater.java | 4 +- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 4 +- .../api/impl/ConsistencyCheckJobAPI.java | 8 +- .../migration/api/impl/MigrationJobAPI.java | 3 +- .../MigrationDataConsistencyChecker.java | 3 +- .../prepare/MigrationJobPreparer.java | 7 +- .../queryable/ShowMigrationRuleExecutor.java | 4 +- .../AlterInventoryIncrementalRuleUpdater.java | 6 +- .../api/impl/MigrationJobAPITest.java | 14 +- 14 files changed, 206 insertions(+), 182 deletions(-) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java index fa4594f7eab8b..0569315a40f25 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java @@ -22,21 +22,15 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper; -import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import java.sql.SQLException; -import java.util.Collection; import java.util.List; -import java.util.Map; /** * Inventory incremental job API. @@ -83,46 +77,6 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa */ void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig); - /** - * Alter process configuration. - * - * @param contextKey context key - * @param processConfig process configuration - */ - void alterProcessConfiguration(PipelineContextKey contextKey, PipelineProcessConfiguration processConfig); - - /** - * Show process configuration. - * - * @param contextKey context key - * @return process configuration, non-null - */ - PipelineProcessConfiguration showProcessConfiguration(PipelineContextKey contextKey); - - /** - * Persist job offset info. - * - * @param jobId job ID - * @param jobOffsetInfo job offset info - */ - void persistJobOffsetInfo(String jobId, JobOffsetInfo jobOffsetInfo); - - /** - * Get job offset info. - * - * @param jobId job ID - * @return job offset progress - */ - JobOffsetInfo getJobOffsetInfo(String jobId); - - /** - * Get job progress. - * - * @param pipelineJobConfig job configuration - * @return each sharding item progress - */ - Map getJobProgress(PipelineJobConfiguration pipelineJobConfig); - /** * Get job infos. * @@ -131,13 +85,6 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa */ List getJobItemInfos(String jobId); - /** - * List all data consistency check algorithms from SPI. - * - * @return data consistency check algorithms - */ - Collection listDataConsistencyCheckAlgorithms(); - /** * Build pipeline data consistency checker. * @@ -149,15 +96,6 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, InventoryIncrementalProcessContext processContext, ConsistencyCheckJobItemProgressContext progressContext); - /** - * Aggregate data consistency check results. - * - * @param jobId job ID - * @param checkResults check results - * @return check success or not - */ - boolean aggregateDataConsistencyCheckResults(String jobId, Map checkResults); - /** * Commit pipeline job. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java new file mode 100644 index 0000000000000..1a29cd942b7d8 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.job.service; + +import com.google.common.base.Preconditions; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; +import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; +import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo; +import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper; +import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; +import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; +import org.apache.shardingsphere.infra.database.core.type.DatabaseType; +import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; +import org.apache.shardingsphere.infra.spi.annotation.SPIDescription; +import org.apache.shardingsphere.infra.util.yaml.YamlEngine; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Inventory incremental job manager. + */ +@RequiredArgsConstructor +@Slf4j +public final class InventoryIncrementalJobManager { + + private final PipelineJobAPI jobAPI; + + private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); + + private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new YamlJobOffsetInfoSwapper(); + + /** + * Alter process configuration. + * + * @param contextKey context key + * @param processConfig process configuration + */ + public void alterProcessConfiguration(final PipelineContextKey contextKey, final PipelineProcessConfiguration processConfig) { + // TODO check rateLimiter type match or not + processConfigPersistService.persist(contextKey, jobAPI.getType(), processConfig); + } + + /** + * Show process configuration. + * + * @param contextKey context key + * @return process configuration, non-null + */ + public PipelineProcessConfiguration showProcessConfiguration(final PipelineContextKey contextKey) { + return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, jobAPI.getType())); + } + + /** + * Get job progress. + * + * @param jobConfig pipeline job configuration + * @return each sharding item progress + */ + public Map getJobProgress(final PipelineJobConfiguration jobConfig) { + PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + String jobId = jobConfig.getJobId(); + JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); + return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> { + Optional jobItemProgress = jobItemManager.getProgress(jobId, each); + jobItemProgress.ifPresent(optional -> optional.setActive(!jobConfigPOJO.isDisabled())); + map.put(each, jobItemProgress.orElse(null)); + }, LinkedHashMap::putAll); + } + + /** + * Persist job offset info. + * + * @param jobId job ID + * @param jobOffsetInfo job offset info + */ + public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo jobOffsetInfo) { + String value = YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo)); + PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId, value); + } + + /** + * Get job offset info. + * + * @param jobId job ID + * @return job offset progress + */ + public JobOffsetInfo getJobOffsetInfo(final String jobId) { + Optional offsetInfo = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId); + if (offsetInfo.isPresent()) { + YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class); + return jobOffsetInfoSwapper.swapToObject(info); + } + return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo()); + } + + /** + * List all data consistency check algorithms from SPI. + * + * @return data consistency check algorithms + */ + public Collection listDataConsistencyCheckAlgorithms() { + Collection result = new LinkedList<>(); + for (TableDataConsistencyChecker each : ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class)) { + SPIDescription description = each.getClass().getAnnotation(SPIDescription.class); + String typeAliases = each.getTypeAliases().stream().map(Object::toString).collect(Collectors.joining(",")); + result.add(new DataConsistencyCheckAlgorithmInfo(each.getType(), typeAliases, getSupportedDatabaseTypes(each.getSupportedDatabaseTypes()), null == description ? "" : description.value())); + } + return result; + } + + private Collection getSupportedDatabaseTypes(final Collection supportedDatabaseTypes) { + return supportedDatabaseTypes.isEmpty() ? ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) : supportedDatabaseTypes; + } + + /** + * Aggregate data consistency check results. + * + * @param jobId job ID + * @param checkResults check results + * @return check success or not + */ + public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map checkResults) { + Preconditions.checkArgument(!checkResults.isEmpty(), "checkResults empty, jobId:", jobId); + for (Entry entry : checkResults.entrySet()) { + TableDataConsistencyCheckResult checkResult = entry.getValue(); + if (!checkResult.isMatched()) { + return false; + } + } + return true; + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java index c3d0b1d689a4d..8562706aafaa1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java @@ -17,85 +17,39 @@ package org.apache.shardingsphere.data.pipeline.core.job.service.impl; -import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; -import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; -import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo; -import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper; -import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; -import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; -import org.apache.shardingsphere.infra.database.core.type.DatabaseType; -import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; -import org.apache.shardingsphere.infra.spi.annotation.SPIDescription; -import org.apache.shardingsphere.infra.util.yaml.YamlEngine; -import java.util.Collection; -import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * Abstract inventory incremental job API implementation. */ -@Slf4j public abstract class AbstractInventoryIncrementalJobAPIImpl implements InventoryIncrementalJobAPI { - private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); - - private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new YamlJobOffsetInfoSwapper(); - - @Override - public void alterProcessConfiguration(final PipelineContextKey contextKey, final PipelineProcessConfiguration processConfig) { - // TODO check rateLimiter type match or not - processConfigPersistService.persist(contextKey, getType(), processConfig); - } - - @Override - public PipelineProcessConfiguration showProcessConfiguration(final PipelineContextKey contextKey) { - return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, getType())); - } - - @Override - public Map getJobProgress(final PipelineJobConfiguration jobConfig) { - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); - String jobId = jobConfig.getJobId(); - JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); - return IntStream.range(0, jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, each) -> { - Optional jobItemProgress = jobItemManager.getProgress(jobId, each); - jobItemProgress.ifPresent(optional -> optional.setActive(!jobConfigPOJO.isDisabled())); - map.put(each, jobItemProgress.orElse(null)); - }, LinkedHashMap::putAll); - } - @Override public List getJobItemInfos(final String jobId) { PipelineJobManager jobManager = new PipelineJobManager(this); - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); PipelineJobConfiguration jobConfig = jobManager.getJobConfiguration(jobConfigPOJO); long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0")); - Map jobProgress = getJobProgress(jobConfig); + InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(this); + Map jobProgress = inventoryIncrementalJobManager.getJobProgress(jobConfig); List result = new LinkedList<>(); + PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); for (Entry entry : jobProgress.entrySet()) { int shardingItem = entry.getKey(); TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) getJobInfo(jobId); @@ -115,49 +69,4 @@ public List getJobItemInfos(final String jobId) } return result; } - - @Override - public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo jobOffsetInfo) { - String value = YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo)); - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId, value); - } - - @Override - public JobOffsetInfo getJobOffsetInfo(final String jobId) { - Optional offsetInfo = PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId); - if (offsetInfo.isPresent()) { - YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class); - return jobOffsetInfoSwapper.swapToObject(info); - } - return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo()); - } - - @Override - public Collection listDataConsistencyCheckAlgorithms() { - Collection result = new LinkedList<>(); - for (TableDataConsistencyChecker each : ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class)) { - SPIDescription description = each.getClass().getAnnotation(SPIDescription.class); - String typeAliases = each.getTypeAliases().stream().map(Object::toString).collect(Collectors.joining(",")); - result.add(new DataConsistencyCheckAlgorithmInfo(each.getType(), typeAliases, getSupportedDatabaseTypes(each.getSupportedDatabaseTypes()), null == description ? "" : description.value())); - } - return result; - } - - private Collection getSupportedDatabaseTypes(final Collection supportedDatabaseTypes) { - return supportedDatabaseTypes.isEmpty() ? ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) : supportedDatabaseTypes; - } - - @Override - public boolean aggregateDataConsistencyCheckResults(final String jobId, final Map checkResults) { - if (checkResults.isEmpty()) { - throw new IllegalArgumentException("checkResults empty, jobId:" + jobId); - } - for (Entry entry : checkResults.entrySet()) { - TableDataConsistencyCheckResult checkResult = entry.getValue(); - if (!checkResult.isMatched()) { - return false; - } - } - return true; - } } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java index f646e82611bde..7e63268e8ba64 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java @@ -20,7 +20,7 @@ import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; @@ -39,7 +39,7 @@ public final class ShowStreamingRuleExecutor implements QueryableRALExecutor getRows(final ShowStreamingRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = ((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING")) + PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING")) .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); Collection result = new LinkedList<>(); result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java index 6b99ab543536c..40c7949c6faaa 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java @@ -17,7 +17,7 @@ package org.apache.shardingsphere.migration.distsql.handler.query; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.database.core.type.DatabaseType; @@ -36,8 +36,8 @@ public final class ShowMigrationCheckAlgorithmsExecutor implements QueryableRALE @Override public Collection getRows(final ShowMigrationCheckAlgorithmsStatement sqlStatement) { - InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"); - return jobAPI.listDataConsistencyCheckAlgorithms().stream().map( + InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")); + return inventoryIncrementalJobManager.listDataConsistencyCheckAlgorithms().stream().map( each -> new LocalDataQueryResultRow(each.getType(), each.getTypeAliases(), each.getSupportedDatabaseTypes().stream().map(DatabaseType::getType).collect(Collectors.joining(",")), each.getDescription())) .collect(Collectors.toList()); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java index 0789b0519b0ac..86d9150871211 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java @@ -20,6 +20,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter; @@ -54,7 +55,8 @@ public void executeUpdate(final String databaseName, final CheckMigrationStateme } private void verifyInventoryFinished(final MigrationJobConfiguration jobConfig) { - ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(), migrationJobAPI.getJobProgress(jobConfig).values()), + InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(migrationJobAPI); + ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(), inventoryIncrementalJobManager.getJobProgress(jobConfig).values()), () -> new PipelineInvalidParameterException("Inventory is not finished.")); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 9f90858e59065..2484d75d9cac5 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -67,6 +67,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; @@ -277,7 +278,8 @@ private ImporterConfiguration buildImporterConfiguration(final CDCJobConfigurati @Override public CDCProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) { - return new CDCProcessContext(pipelineJobConfig.getJobId(), showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()))); + InventoryIncrementalJobManager jobManager = new InventoryIncrementalJobManager(this); + return new CDCProcessContext(pipelineJobConfig.getJobId(), jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()))); } @Override diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index ff0e63ce7af69..d2689235c2cc5 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -31,7 +31,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; @@ -279,9 +279,9 @@ private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo if (checkJobResult.isEmpty()) { result.setCheckSuccess(null); } else { - InventoryIncrementalJobAPI inventoryIncrementalJobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService( - PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType()); - result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult)); + InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager( + TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType())); + result.setCheckSuccess(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult)); } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 3d12a4363bc0f..c29a961158ded 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -53,6 +53,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl; @@ -277,7 +278,7 @@ private ImporterConfiguration buildImporterConfiguration(final MigrationJobConfi @Override public MigrationProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) { - PipelineProcessConfiguration processConfig = showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())); + PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())); return new MigrationProcessContext(pipelineJobConfig.getJobId(), processConfig); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java index addf2dee11af1..c189075c554b0 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java @@ -41,6 +41,7 @@ import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker; import org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; import org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; @@ -98,7 +99,7 @@ public Map check(final String algorithm } private long getRecordsCount() { - Map jobProgress = new MigrationJobAPI().getJobProgress(jobConfig); + Map jobProgress = new InventoryIncrementalJobManager(new MigrationJobAPI()).getJobProgress(jobConfig); return jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum(); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index 2857e63f58e0e..358f340518048 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -45,6 +45,7 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter; import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; @@ -84,6 +85,8 @@ public final class MigrationJobPreparer { private final PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + private final InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI); + /** * Do prepare work. * @@ -133,12 +136,12 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem if (lockContext.tryLock(lockDefinition, 600000)) { log.info("try lock success, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis); try { - JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId); + JobOffsetInfo offsetInfo = inventoryIncrementalJobManager.getJobOffsetInfo(jobId); if (!offsetInfo.isTargetSchemaTableCreated()) { jobItemContext.setStatus(JobStatus.PREPARING); jobItemManager.updateStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING); prepareAndCheckTarget(jobItemContext); - jobAPI.persistJobOffsetInfo(jobId, new JobOffsetInfo(true)); + inventoryIncrementalJobManager.persistJobOffsetInfo(jobId, new JobOffsetInfo(true)); } } finally { log.info("unlock, jobId={}, shardingItem={}, cost {} ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - startTimeMillis); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java index e592dd7ee12fb..4100c69bc74c9 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java @@ -19,7 +19,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.distsql.statement.ral.queryable.ShowMigrationRuleStatement; @@ -39,7 +39,7 @@ public final class ShowMigrationRuleExecutor implements QueryableRALExecutor getRows(final ShowMigrationRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = ((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")) + PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")) .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); Collection result = new LinkedList<>(); result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java index 4931418c4994a..b07cae2401fa9 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java @@ -19,7 +19,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement; @@ -34,9 +34,9 @@ public final class AlterInventoryIncrementalRuleUpdater implements RALUpdater jobItemManager; private static DatabaseType databaseType; @@ -101,6 +104,7 @@ static void beforeClass() { PipelineContextUtils.mockModeConfigAndContextManager(); jobAPI = new MigrationJobAPI(); jobManager = new PipelineJobManager(jobAPI); + inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI); jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); String jdbcUrl = "jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL"; databaseType = DatabaseTypeFactory.get(jdbcUrl); @@ -171,7 +175,7 @@ void assertGetProgress() { MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); Optional jobId = jobManager.start(jobConfig); assertTrue(jobId.isPresent()); - Map jobProgressMap = jobAPI.getJobProgress(jobConfig); + Map jobProgressMap = inventoryIncrementalJobManager.getJobProgress(jobConfig); assertThat(jobProgressMap.size(), is(1)); } @@ -191,7 +195,7 @@ void assertDataConsistencyCheck() { @Test void assertAggregateEmptyDataConsistencyCheckResults() { - assertThrows(IllegalArgumentException.class, () -> jobAPI.aggregateDataConsistencyCheckResults("foo_job", Collections.emptyMap())); + assertThrows(IllegalArgumentException.class, () -> inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job", Collections.emptyMap())); } @Test @@ -199,7 +203,7 @@ void assertAggregateDifferentDataConsistencyCheckResults() { Map checkResults = new LinkedHashMap<>(2, 1F); checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true)); checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(false)); - assertFalse(jobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults)); + assertFalse(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job", checkResults)); } @Test @@ -207,7 +211,7 @@ void assertAggregateSameDataConsistencyCheckResults() { Map checkResults = new LinkedHashMap<>(2, 1F); checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true)); checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(true)); - assertTrue(jobAPI.aggregateDataConsistencyCheckResults("foo_job", checkResults)); + assertTrue(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job", checkResults)); } @Test @@ -218,7 +222,7 @@ void assertSwitchClusterConfigurationSucceed() { MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); jobItemManager.persistProgress(jobItemContext); jobItemManager.updateStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK); - Map progress = jobAPI.getJobProgress(jobConfig); + Map progress = inventoryIncrementalJobManager.getJobProgress(jobConfig); for (Entry entry : progress.entrySet()) { assertThat(entry.getValue().getStatus(), is(JobStatus.EXECUTE_INVENTORY_TASK)); }