From 87903c685866423501b878608d10ab7e20e4a73f Mon Sep 17 00:00:00 2001 From: zhangliang Date: Sun, 19 Nov 2023 01:02:27 +0800 Subject: [PATCH] Remove AbstractInventoryIncrementalJobAPIImpl --- .../service/InventoryIncrementalJobAPI.java | 10 --- .../InventoryIncrementalJobManager.java | 42 ++++++++++- ...bstractInventoryIncrementalJobAPIImpl.java | 72 ------------------- .../query/ShowStreamingJobStatusExecutor.java | 3 +- .../query/ShowStreamingRuleExecutor.java | 3 +- .../ShowMigrationCheckAlgorithmsExecutor.java | 3 +- .../query/ShowMigrationJobStatusExecutor.java | 3 +- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 4 +- .../api/impl/ConsistencyCheckJobAPI.java | 3 +- .../migration/api/impl/MigrationJobAPI.java | 4 +- .../queryable/ShowMigrationRuleExecutor.java | 3 +- .../AlterInventoryIncrementalRuleUpdater.java | 3 +- .../api/impl/MigrationJobAPITest.java | 4 +- 13 files changed, 61 insertions(+), 96 deletions(-) delete mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java index 0569315a40f25..76a66b00e6b60 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java @@ -23,14 +23,12 @@ import org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper; -import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import java.sql.SQLException; -import java.util.List; /** * Inventory incremental job API. @@ -77,14 +75,6 @@ default YamlInventoryIncrementalJobItemProgressSwapper getYamlJobItemProgressSwa */ void extendYamlJobConfiguration(PipelineContextKey contextKey, YamlPipelineJobConfiguration yamlJobConfig); - /** - * Get job infos. - * - * @param jobId job ID - * @return job item infos - */ - List getJobItemInfos(String jobId); - /** * Build pipeline data consistency checker. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java index efe7ce8af46ae..e31ee4796b233 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java @@ -23,11 +23,14 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo; import org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper; import org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo; +import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; +import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult; import org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; @@ -41,7 +44,9 @@ import java.util.Collection; import java.util.LinkedHashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -52,7 +57,7 @@ @RequiredArgsConstructor public final class InventoryIncrementalJobManager { - private final PipelineJobAPI jobAPI; + private final InventoryIncrementalJobAPI jobAPI; private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); @@ -77,6 +82,41 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, jobAPI.getType())); } + /** + * Get job infos. + * + * @param jobId job ID + * @return job item infos + */ + public List getJobItemInfos(final String jobId) { + PipelineJobManager jobManager = new PipelineJobManager(jobAPI); + JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); + PipelineJobConfiguration jobConfig = jobManager.getJobConfiguration(jobConfigPOJO); + long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0")); + InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI); + Map jobProgress = inventoryIncrementalJobManager.getJobProgress(jobConfig); + List result = new LinkedList<>(); + PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()); + for (Entry entry : jobProgress.entrySet()) { + int shardingItem = entry.getKey(); + TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) jobAPI.getJobInfo(jobId); + InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue(); + String errorMessage = jobItemManager.getErrorMessage(jobId, shardingItem); + if (null == jobItemProgress) { + result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, errorMessage)); + continue; + } + int inventoryFinishedPercentage = 0; + if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemProgress.getStatus() || JobStatus.FINISHED == jobItemProgress.getStatus()) { + inventoryFinishedPercentage = 100; + } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) { + inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount()); + } + result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage)); + } + return result; + } + /** * Get job progress. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java deleted file mode 100644 index 8562706aafaa1..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.core.job.service.impl; - -import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; -import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; -import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; -import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; -import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; -import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; - -/** - * Abstract inventory incremental job API implementation. - */ -public abstract class AbstractInventoryIncrementalJobAPIImpl implements InventoryIncrementalJobAPI { - - @Override - public List getJobItemInfos(final String jobId) { - PipelineJobManager jobManager = new PipelineJobManager(this); - JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); - PipelineJobConfiguration jobConfig = jobManager.getJobConfiguration(jobConfigPOJO); - long startTimeMillis = Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0")); - InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(this); - Map jobProgress = inventoryIncrementalJobManager.getJobProgress(jobConfig); - List result = new LinkedList<>(); - PipelineJobItemManager jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper()); - for (Entry entry : jobProgress.entrySet()) { - int shardingItem = entry.getKey(); - TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) getJobInfo(jobId); - InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue(); - String errorMessage = jobItemManager.getErrorMessage(jobId, shardingItem); - if (null == jobItemProgress) { - result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), null, startTimeMillis, 0, errorMessage)); - continue; - } - int inventoryFinishedPercentage = 0; - if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemProgress.getStatus() || JobStatus.FINISHED == jobItemProgress.getStatus()) { - inventoryFinishedPercentage = 100; - } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) { - inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount()); - } - result.add(new InventoryIncrementalJobItemInfo(shardingItem, jobInfo.getTable(), jobItemProgress, startTimeMillis, inventoryFinishedPercentage, errorMessage)); - } - return result; - } -} diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java index 87fab7477a4b1..60bed768d2257 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java @@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; @@ -41,7 +42,7 @@ public final class ShowStreamingJobStatusExecutor implements QueryableRALExecuto @Override public Collection getRows(final ShowStreamingStatusStatement sqlStatement) { InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getType()); - List jobItemInfos = jobAPI.getJobItemInfos(sqlStatement.getJobId()); + List jobItemInfos = new InventoryIncrementalJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId()); long currentTimeMillis = System.currentTimeMillis(); return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList()); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java index 7e63268e8ba64..78a02f6ea1f9c 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java @@ -20,6 +20,7 @@ import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement; import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; @@ -39,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements QueryableRALExecutor getRows(final ShowStreamingRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING")) + PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING")) .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); Collection result = new LinkedList<>(); result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java index 40c7949c6faaa..425d94e993bbd 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.migration.distsql.handler.query; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; @@ -36,7 +37,7 @@ public final class ShowMigrationCheckAlgorithmsExecutor implements QueryableRALE @Override public Collection getRows(final ShowMigrationCheckAlgorithmsStatement sqlStatement) { - InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")); + InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")); return inventoryIncrementalJobManager.listDataConsistencyCheckAlgorithms().stream().map( each -> new LocalDataQueryResultRow(each.getType(), each.getTypeAliases(), each.getSupportedDatabaseTypes().stream().map(DatabaseType::getType).collect(Collectors.joining(",")), each.getDescription())) diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java index b92d9efb73e4c..79684dc0eb1f4 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java @@ -20,6 +20,7 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; @@ -40,7 +41,7 @@ public final class ShowMigrationJobStatusExecutor implements QueryableRALExecuto @Override public Collection getRows(final ShowMigrationStatusStatement sqlStatement) { InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"); - List jobItemInfos = jobAPI.getJobItemInfos(sqlStatement.getJobId()); + List jobItemInfos = new InventoryIncrementalJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId()); long currentTimeMillis = System.currentTimeMillis(); return jobItemInfos.stream().map(each -> generateResultRow(each, currentTimeMillis)).collect(Collectors.toList()); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 2484d75d9cac5..135d62f77104e 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -67,11 +67,11 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl; import org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap; @@ -98,7 +98,7 @@ * CDC job API. */ @Slf4j -public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl { +public final class CDCJobAPI implements InventoryIncrementalJobAPI { private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index d2689235c2cc5..93520db26f8c1 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -31,6 +31,7 @@ import org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException; import org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; @@ -280,7 +281,7 @@ private void fillInJobItemInfoWithCheckResult(final ConsistencyCheckJobItemInfo result.setCheckSuccess(null); } else { InventoryIncrementalJobManager inventoryIncrementalJobManager = new InventoryIncrementalJobManager( - TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType())); + (InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, PipelineJobIdUtils.parseJobType(parentJobId).getType())); result.setCheckSuccess(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults(parentJobId, checkJobResult)); } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index c29a961158ded..4f7a023d21be7 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -53,10 +53,10 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext; import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob; import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId; @@ -110,7 +110,7 @@ * Migration job API. */ @Slf4j -public final class MigrationJobAPI extends AbstractInventoryIncrementalJobAPIImpl { +public final class MigrationJobAPI implements InventoryIncrementalJobAPI { private final PipelineDataSourcePersistService dataSourcePersistService = new PipelineDataSourcePersistService(); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java index 4100c69bc74c9..d9e5391261367 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java @@ -19,6 +19,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; @@ -39,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements QueryableRALExecutor getRows(final ShowMigrationRuleStatement sqlStatement) { - PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")) + PipelineProcessConfiguration processConfig = new InventoryIncrementalJobManager((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION")) .showProcessConfiguration(new PipelineContextKey(InstanceType.PROXY)); Collection result = new LinkedList<>(); result.add(new LocalDataQueryResultRow(getString(processConfig.getRead()), getString(processConfig.getWrite()), getString(processConfig.getStreamChannel()))); diff --git a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java index b07cae2401fa9..52648f2cc95c3 100644 --- a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java +++ b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java @@ -19,6 +19,7 @@ import org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; @@ -34,7 +35,7 @@ public final class AlterInventoryIncrementalRuleUpdater implements RALUpdater jobItemInfos = jobAPI.getJobItemInfos(jobId.get()); + List jobItemInfos = inventoryIncrementalJobManager.getJobItemInfos(jobId.get()); assertThat(jobItemInfos.size(), is(1)); InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0); assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.RUNNING)); @@ -343,7 +343,7 @@ void assertGetJobItemInfosAtIncrementTask() { yamlJobItemProgress.setProcessedRecordsCount(100); yamlJobItemProgress.setInventoryRecordsCount(50); PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId.get(), 0, YamlEngine.marshal(yamlJobItemProgress)); - List jobItemInfos = jobAPI.getJobItemInfos(jobId.get()); + List jobItemInfos = inventoryIncrementalJobManager.getJobItemInfos(jobId.get()); InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0); assertThat(jobItemInfo.getJobItemProgress().getStatus(), is(JobStatus.EXECUTE_INCREMENTAL_TASK)); assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(100));