From 5173c7e1a6fed394436565e4eee9cdc3693c696b Mon Sep 17 00:00:00 2001 From: zhangliang Date: Thu, 16 Nov 2023 00:14:53 +0800 Subject: [PATCH] Merge AbstractPipelineJobAPIImpl and PipelineJobManager --- .../core/job/service/PipelineJobAPI.java | 43 +++++++++------ .../core/job/service/PipelineJobManager.java | 49 +++++++++++++++++ ...bstractInventoryIncrementalJobAPIImpl.java | 2 +- .../impl/AbstractPipelineJobAPIImpl.java | 54 ------------------- .../handler/update/StartMigrationUpdater.java | 5 +- .../api/impl/ConsistencyCheckJobAPI.java | 15 +----- .../migration/api/impl/MigrationJobAPI.java | 15 +----- .../api/impl/MigrationJobAPITest.java | 2 +- 8 files changed, 85 insertions(+), 100 deletions(-) delete mode 100644 kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index 0d9e76012ec49..134bc7e7e16d1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -66,13 +66,6 @@ public interface PipelineJobAPI extends TypedSPI { */ PipelineProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig); - /** - * Start disabled job. - * - * @param jobId job id - */ - void startDisabledJob(String jobId); - /** * Get job configuration. * @@ -89,6 +82,33 @@ public interface PipelineJobAPI extends TypedSPI { */ PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO jobConfigPOJO); + /** + * Whether to ignore to start disabled job when job item progress is finished. + * + * @return ignore to start disabled job when job item progress is finished or not + */ + default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() { + return false; + } + + /** + * Get to be start disabled next job type. + * + * @return to be start disabled next job type + */ + default Optional getToBeStartDisabledNextJobType() { + return Optional.empty(); + } + + /** + * Get to be stopped previous job type. + * + * @return to be stopped previous job type + */ + default Optional getToBeStoppedPreviousJobType() { + return Optional.empty(); + } + /** * Get pipeline job info. * @@ -129,15 +149,6 @@ public interface PipelineJobAPI extends TypedSPI { */ void updateJobItemStatus(String jobId, int shardingItem, JobStatus status); - /** - * Get to be stopped previous job type. - * - * @return to be stopped previous job type - */ - default Optional getToBeStoppedPreviousJobType() { - return Optional.empty(); - } - /** * Get pipeline job class. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index d6ae265df40d7..47d8eecad2e43 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -22,11 +22,14 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration; import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey; +import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; +import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo; import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier; import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException; +import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo; @@ -73,6 +76,52 @@ public Optional start(final PipelineJobConfiguration jobConfig) { return Optional.of(jobId); } + /** + * Start disabled job. + * + * @param jobId job id + */ + public void startDisabledJob(final String jobId) { + if (pipelineJobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) { + Optional jobItemProgress = pipelineJobAPI.getJobItemProgress(jobId, 0); + if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) { + log.info("job status is FINISHED, ignore, jobId={}", jobId); + return; + } + } + startCurrentDisabledJob(jobId); + pipelineJobAPI.getToBeStartDisabledNextJobType().ifPresent(optional -> startNextDisabledJob(jobId, optional)); + + } + + private void startCurrentDisabledJob(final String jobId) { + PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId)); + pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId)); + JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); + ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId)); + jobConfigPOJO.setDisabled(false); + jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); + jobConfigPOJO.getProps().remove("stop_time"); + jobConfigPOJO.getProps().remove("stop_time_millis"); + jobConfigPOJO.getProps().setProperty("run_count", String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count", "0")) + 1)); + String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId); + pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount()); + PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO); + pipelineDistributedBarrier.await(barrierEnablePath, 5L, TimeUnit.SECONDS); + } + + private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) { + PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional -> { + try { + new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, toBeStartDisabledNextJobType)).startDisabledJob(optional); + // CHECKSTYLE:OFF + } catch (final RuntimeException ex) { + // CHECKSTYLE:ON + log.warn("start related check job failed, check job id: {}, error: {}", optional, ex.getMessage()); + } + }); + } + /** * Stop pipeline job. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java index 89cf4265caf01..bfc7e8e9fd416 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java @@ -69,7 +69,7 @@ * Abstract inventory incremental job API implementation. */ @Slf4j -public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPipelineJobAPIImpl implements InventoryIncrementalJobAPI { +public abstract class AbstractInventoryIncrementalJobAPIImpl implements InventoryIncrementalJobAPI { private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java deleted file mode 100644 index 3031f6bb0070b..0000000000000 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.shardingsphere.data.pipeline.core.job.service.impl; - -import lombok.extern.slf4j.Slf4j; -import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode; -import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier; -import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException; -import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; -import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; -import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; - -import java.util.concurrent.TimeUnit; - -/** - * Abstract pipeline job API impl. - */ -@Slf4j -public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI { - - @Override - public void startDisabledJob(final String jobId) { - PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId)); - pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId)); - JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId); - ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId)); - jobConfigPOJO.setDisabled(false); - jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis())); - jobConfigPOJO.getProps().remove("stop_time"); - jobConfigPOJO.getProps().remove("stop_time_millis"); - jobConfigPOJO.getProps().setProperty("run_count", String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count", "0")) + 1)); - String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId); - pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount()); - PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO); - pipelineDistributedBarrier.await(barrierEnablePath, 5, TimeUnit.SECONDS); - } -} diff --git a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java index fe1629adaaf89..7c504a3f4fc1d 100644 --- a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java +++ b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.migration.distsql.handler.update; +import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI; import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater; import org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement; @@ -26,11 +27,11 @@ */ public final class StartMigrationUpdater implements RALUpdater { - private final MigrationJobAPI jobAPI = new MigrationJobAPI(); + private final PipelineJobManager jobManager = new PipelineJobManager(new MigrationJobAPI()); @Override public void executeUpdate(final String databaseName, final StartMigrationStatement sqlStatement) { - jobAPI.startDisabledJob(sqlStatement.getJobId()); + jobManager.startDisabledJob(sqlStatement.getJobId()); } @Override diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index feca4cc623aa7..ddbb7e7a89e4d 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -43,7 +43,6 @@ import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; -import org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractPipelineJobAPIImpl; import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob; import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId; @@ -78,7 +77,7 @@ * Consistency check job API. */ @Slf4j -public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl { +public final class ConsistencyCheckJobAPI implements PipelineJobAPI { private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); @@ -181,23 +180,13 @@ public void updateJobItemStatus(final String jobId, final int shardingItem, fina YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get()))); } - @Override - public void startDisabledJob(final String jobId) { - Optional jobItemProgress = getJobItemProgress(jobId, 0); - if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) { - log.info("job status is FINISHED, ignore, jobId={}", jobId); - return; - } - super.startDisabledJob(jobId); - } - /** * Start by parent job id. * * @param parentJobId parent job id */ public void startByParentJobId(final String parentJobId) { - startDisabledJob(getLatestCheckJobId(parentJobId)); + new PipelineJobManager(this).startDisabledJob(getLatestCheckJobId(parentJobId)); } private String getLatestCheckJobId(final String parentJobId) { diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java index a8cef9e3334d7..1fe00789a8286 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java @@ -54,7 +54,6 @@ import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory; -import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager; import org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl; import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService; @@ -81,7 +80,6 @@ import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit; -import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.infra.util.json.JsonUtils; import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration; import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; @@ -295,17 +293,8 @@ public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final } @Override - public void startDisabledJob(final String jobId) { - super.startDisabledJob(jobId); - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional -> { - try { - TypedSPILoader.getService(PipelineJobAPI.class, "CONSISTENCY_CHECK").startDisabledJob(optional); - // CHECKSTYLE:OFF - } catch (final RuntimeException ex) { - // CHECKSTYLE:ON - log.warn("start related check job failed, check job id: {}, error: {}", optional, ex.getMessage()); - } - }); + public Optional getToBeStartDisabledNextJobType() { + return Optional.of("CONSISTENCY_CHECK"); } @Override diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index 8126f87c989cb..e206c20287567 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -134,7 +134,7 @@ void assertStartOrStopById() { when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier); jobManager.stop(jobId.get()); assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled()); - jobAPI.startDisabledJob(jobId.get()); + jobManager.startDisabledJob(jobId.get()); assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled()); }