From f8a420f2f55a371801796e6331af2397ad9941cb Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Sun, 12 Nov 2023 08:53:27 +0800 Subject: [PATCH] Move PipelineJobAPI.marshalJobId() to PipelineJobId.marshal() (#29011) --- .../core/job/service/PipelineJobAPI.java | 9 ++-- .../core/job/service/PipelineJobManager.java | 52 +++++++++++++++++++ ...bstractInventoryIncrementalJobAPIImpl.java | 5 +- .../impl/AbstractPipelineJobAPIImpl.java | 18 ------- .../query/ShowStreamingListExecutor.java | 5 +- .../query/ShowMigrationListExecutor.java | 5 +- .../data/pipeline/cdc/api/impl/CDCJobAPI.java | 2 +- .../api/impl/ConsistencyCheckJobAPI.java | 2 +- .../migration/api/impl/MigrationJobAPI.java | 2 +- 9 files changed, 66 insertions(+), 34 deletions(-) create mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index ee0b59f54e2ad..59c4a42fb7afb 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -30,7 +30,6 @@ import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI; -import java.util.List; import java.util.Optional; /** @@ -97,11 +96,11 @@ public interface PipelineJobAPI extends TypedSPI { /** * Get pipeline job info. - * - * @param contextKey context key - * @return job info list + * + * @param jobId job ID + * @return pipeline job info */ - List list(PipelineContextKey contextKey); + PipelineJobInfo getJobInfo(String jobId); /** * Persist job item progress. diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java new file mode 100644 index 0000000000000..55a4a6b333794 --- /dev/null +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.job.service; + +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; +import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Pipeline job manager. + */ +@RequiredArgsConstructor +public final class PipelineJobManager { + + private final PipelineJobAPI pipelineJobAPI; + + /** + * Get pipeline jobs info. + * + * @param contextKey context key + * @return jobs info + */ + public List getPipelineJobInfos(final PipelineContextKey contextKey) { + return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each -> pipelineJobAPI.getJobInfo(each.getJobName())).collect(Collectors.toList()); + } + + private Stream getJobBriefInfos(final PipelineContextKey contextKey, final String jobType) { + return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_")) + .filter(each -> jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType())); + } +} diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java index 8d79fbf093f25..da4ead5527214 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java @@ -88,9 +88,6 @@ public PipelineProcessConfiguration showProcessConfiguration(final PipelineConte return PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey, getType())); } - @Override - protected abstract TableBasedPipelineJobInfo getJobInfo(String jobId); - @Override public Map getJobProgress(final PipelineJobConfiguration jobConfig) { String jobId = jobConfig.getJobId(); @@ -111,7 +108,7 @@ public List getJobItemInfos(final String jobId) List result = new LinkedList<>(); for (Entry entry : jobProgress.entrySet()) { int shardingItem = entry.getKey(); - TableBasedPipelineJobInfo jobInfo = getJobInfo(jobId); + TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) getJobInfo(jobId); InventoryIncrementalJobItemProgress jobItemProgress = entry.getValue(); String errorMessage = getJobItemErrorMessage(jobId, shardingItem); if (null == jobItemProgress) { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java index 35e9b000d9691..5d52a1cfb6635 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java @@ -24,7 +24,6 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; -import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier; @@ -35,18 +34,14 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; -import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Abstract pipeline job API impl. @@ -56,19 +51,6 @@ public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI { protected static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - @Override - public List list(final PipelineContextKey contextKey) { - return getJobBriefInfos(contextKey).map(each -> getJobInfo(each.getJobName())).collect(Collectors.toList()); - } - - private Stream getJobBriefInfos(final PipelineContextKey contextKey) { - return PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each -> !each.getJobName().startsWith("_")) - .filter(each -> PipelineJobIdUtils.parseJobType(each.getJobName()).getType().equals(getType())); - } - - // TODO Add getJobInfo - protected abstract PipelineJobInfo getJobInfo(String jobId); - protected PipelineJobMetaData buildPipelineJobMetaData(final JobConfigurationPOJO jobConfigPOJO) { return new PipelineJobMetaData(jobConfigPOJO.getJobName(), !jobConfigPOJO.isDisabled(), jobConfigPOJO.getShardingTotalCount(), jobConfigPOJO.getProps().getProperty("create_time"), jobConfigPOJO.getProps().getProperty("stop_time"), jobConfigPOJO.getJobParameter()); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java index fe9676f511932..f6ca1ea172a3f 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java @@ -21,6 +21,7 @@ import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; import org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow; @@ -35,11 +36,11 @@ */ public final class ShowStreamingListExecutor implements QueryableRALExecutor { - private final CDCJobAPI jobAPI = new CDCJobAPI(); + private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new CDCJobAPI()); @Override public Collection getRows(final ShowStreamingListStatement sqlStatement) { - return jobAPI.list(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(), + return pipelineJobManager.getPipelineJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(), ((TableBasedPipelineJobInfo) each).getDatabaseName(), ((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), each.getJobMetaData().getCreateTime(), Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList()); diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java index 94585e897539f..a731a088a97e4 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java @@ -19,6 +19,7 @@ import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; import org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; import org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor; import org.apache.shardingsphere.infra.instance.metadata.InstanceType; @@ -34,11 +35,11 @@ */ public final class ShowMigrationListExecutor implements QueryableRALExecutor { - private final MigrationJobAPI jobAPI = new MigrationJobAPI(); + private final PipelineJobManager pipelineJobManager = new PipelineJobManager(new MigrationJobAPI()); @Override public Collection getRows(final ShowMigrationListStatement sqlStatement) { - return jobAPI.list(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(), + return pipelineJobManager.getPipelineJobInfos(new PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new LocalDataQueryResultRow(each.getJobMetaData().getJobId(), ((TableBasedPipelineJobInfo) each).getTable(), each.getJobMetaData().getJobItemCount(), each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : Boolean.FALSE.toString(), each.getJobMetaData().getCreateTime(), each.getJobMetaData().getStopTime())).collect(Collectors.toList()); diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java index 81942ca429efe..88dabfe855e66 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java @@ -300,7 +300,7 @@ protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final Pipeline } @Override - protected TableBasedPipelineJobInfo getJobInfo(final String jobId) { + public TableBasedPipelineJobInfo getJobInfo(final String jobId) { JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId); PipelineJobMetaData jobMetaData = buildPipelineJobMetaData(jobConfigPOJO); CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO); diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 2f8f80c5bbd2e..7a894858c807d 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -385,7 +385,7 @@ public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfi } @Override - protected PipelineJobInfo getJobInfo(final String jobId) { + public PipelineJobInfo getJobInfo(final String jobId) { throw new UnsupportedOperationException(); } diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index 6605a1218aead..7477ebcf612ae 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -209,7 +209,7 @@ private Map buildTargetTableSchemaMap(final Map sourceTables = new LinkedList<>();