From 6f80df1bba452f279ca8761384e330b2b28c40c9 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Tue, 21 Nov 2023 17:12:43 +0800 Subject: [PATCH 1/2] Refactor PipelineJobInfo --- .../pipeline/common/pojo/PipelineJobInfo.java | 27 ++++++++---- .../pojo/TableBasedPipelineJobInfo.java | 42 ------------------- .../InventoryIncrementalJobManager.java | 4 +- .../query/ShowStreamingListExecutor.java | 3 +- .../query/ShowMigrationListExecutor.java | 3 +- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 6 +-- .../migration/api/impl/MigrationJobAPI.java | 6 +-- 7 files changed, 29 insertions(+), 62 deletions(-) delete mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/TableBasedPipelineJobInfo.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java index 8f028d3b23401..8377dd0869441 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java @@ -17,15 +17,26 @@ package org.apache.shardingsphere.data.pipeline.common.pojo; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + /** - * Pipeline job meta data. + * Pipeline job info. */ -public interface PipelineJobInfo { +@RequiredArgsConstructor +@Getter +public final class PipelineJobInfo { + + private final PipelineJobMetaData jobMetaData; + + private final String databaseName; + + // TODO Rename + private final String table; - /** - * Get job meta data. - * - * @return job meta data - */ - PipelineJobMetaData getJobMetaData(); + public PipelineJobInfo(final PipelineJobMetaData jobMetaData, final String table) { + this.jobMetaData = jobMetaData; + databaseName = null; + this.table = table; + } } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/TableBasedPipelineJobInfo.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/TableBasedPipelineJobInfo.java deleted file mode 100644 index bcf5fa013cfdc..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/TableBasedPipelineJobInfo.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.common.pojo; - -import lombok.Getter; -import lombok.RequiredArgsConstructor; - -/** - * Table based pipeline job info. - */ -@RequiredArgsConstructor -@Getter -public final class TableBasedPipelineJobInfo implements PipelineJobInfo { - - private final PipelineJobMetaData jobMetaData; - - private final String databaseName; - - // TODO Rename - private final String table; - - public TableBasedPipelineJobInfo(final PipelineJobMetaData jobMetaData, final String table) { - this.jobMetaData = jobMetaData; - databaseName = null; - this.table = table; - } -} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java index 7e41b403aaca2..dbdf9aadcb0ba 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java @@ -25,7 +25,7 @@ import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo; -import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; @@ -80,7 +80,7 @@ public List getJobItemInfos(final String jobId) long startTimeMillis = Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0")); Map jobProgress = getJobProgress(jobConfig); List result = new LinkedList<>(); - TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) jobAPI.getJobInfo(jobId); + PipelineJobInfo jobInfo = jobAPI.getJobInfo(jobId); for (Entry entry : jobProgress.entrySet()) { int shardingItem = entry.getKey(); InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue(); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java index c5c64bb35ffa4..4abb07b7ce5ea 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java @@ -20,7 +20,6 @@ import org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement; import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; @@ -41,7 +40,7 @@ public final class ShowStreamingListExecutor implements QueryableRALExecutor getRows(final ShowStreamingListStatement sqlStatement) { return pipelineJobManager.getJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(), - ((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(), + each.getDatabaseName(), each.getTable(), each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList()); } diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java index ce1877a57197d..3f1ec3deb4c7c 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.migration.distsql.handler.query; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; -import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; @@ -40,7 +39,7 @@ public final class ShowMigrationListExecutor implements QueryableRALExecutor getRows(final ShowMigrationListStatement sqlStatement) { return pipelineJobManager.getJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(), - ((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(), + each.getTable(), each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), each.getJobMetaData().getCreateTime(), each.getJobMetaData().getStopTime())).collect(Collectors.toList()); } diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index c6cd66526876b..405d0f5a5ad84 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -50,8 +50,8 @@ import org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; -import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress; @@ -290,10 +290,10 @@ public YamlCDCJobConfigurationSwapper getYamlJobConfigurationSwapper() { } @Override - public TableBasedPipelineJobInfo getJobInfo(final String jobId) { + public PipelineJobInfo getJobInfo(final String jobId) { PipelineJobMetaData jobMetaData = new PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)); CDCJobConfiguration jobConfig = new PipelineJobManager(this).getJobConfiguration(jobId); - return new TableBasedPipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames())); + return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), String.join(", ", jobConfig.getSchemaTableNames())); } @Override diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 6ffa6371715b4..a882b5ddb2934 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -39,8 +39,8 @@ import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier; import org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable; import org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; -import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm; import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder; import org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor; @@ -209,12 +209,12 @@ private Map buildTargetTableSchemaMap(final Map sourceTables = new LinkedList<>(); new PipelineJobManager(this).getJobConfiguration(jobId).getJobShardingDataNodes() .forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); - return new TableBasedPipelineJobInfo(jobMetaData, String.join(",", sourceTables)); + return new PipelineJobInfo(jobMetaData, String.join(",", sourceTables)); } @Override From 440e3b311fe0be40bb1995f0ce967d5bd5b5ec4e Mon Sep 17 00:00:00 2001 From: zhangliang Date: Tue, 21 Nov 2023 17:14:04 +0800 Subject: [PATCH 2/2] Refactor PipelineJobInfo --- .../data/pipeline/common/pojo/PipelineJobInfo.java | 6 ------ .../scenario/migration/api/impl/MigrationJobAPI.java | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java index 8377dd0869441..44ba79c520604 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/PipelineJobInfo.java @@ -33,10 +33,4 @@ public final class PipelineJobInfo { // TODO Rename private final String table; - - public PipelineJobInfo(final PipelineJobMetaData jobMetaData, final String table) { - this.jobMetaData = jobMetaData; - databaseName = null; - this.table = table; - } } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index a882b5ddb2934..d80211588ae92 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -214,7 +214,7 @@ public PipelineJobInfo getJobInfo(final String jobId) { List sourceTables = new LinkedList<>(); new PipelineJobManager(this).getJobConfiguration(jobId).getJobShardingDataNodes() .forEach(each -> each.getEntries().forEach(entry -> entry.getDataNodes().forEach(dataNode -> sourceTables.add(DataNodeUtils.formatWithSchema(dataNode))))); - return new PipelineJobInfo(jobMetaData, String.join(",", sourceTables)); + return new PipelineJobInfo(jobMetaData, null, String.join(",", sourceTables)); } @Override