diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java index 4153e74073306..3c9770a369f1b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java @@ -30,4 +30,11 @@ public interface PipelineJobItemProgress { * @return job status */ JobStatus getStatus(); + + /** + * Set status. + * + * @param jobStatus job status + */ + void setStatus(JobStatus jobStatus); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java index 168b18d88adb7..87ae49520dcc0 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java @@ -17,7 +17,6 @@ package org.apache.shardingsphere.data.pipeline.core.job.service; -import org.apache.shardingsphere.data.pipeline.common.job.JobStatus; import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob; import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper; @@ -76,15 +75,6 @@ default Optional getToBeStoppedPreviousJobType() { return Optional.empty(); } - /** - * Update job item status. - * - * @param jobId job id - * @param shardingItem sharding item - * @param status status - */ - void updateJobItemStatus(String jobId, int shardingItem, JobStatus status); - /** * Get pipeline job class. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java index e7fd3ad85e652..3eab31d08af5a 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java @@ -227,6 +227,24 @@ public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) .persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext)); } + /** + * Update job item status. + * + * @param jobId job id + * @param shardingItem sharding item + * @param status status + */ + public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) { + Optional jobItemProgress = getJobItemProgress(jobId, shardingItem); + if (!jobItemProgress.isPresent()) { + log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem); + return; + } + jobItemProgress.get().setStatus(status); + PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem, + YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get()))); + } + /** * Update job item progress. * diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java index 6b0cf4171e669..f0c85629db738 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java @@ -130,18 +130,6 @@ public JobOffsetInfo getJobOffsetInfo(final String jobId) { return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo()); } - @Override - public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) { - PipelineJobManager jobManager = new PipelineJobManager(this); - Optional jobItemProgress = jobManager.getJobItemProgress(jobId, shardingItem); - if (!jobItemProgress.isPresent()) { - return; - } - jobItemProgress.get().setStatus(status); - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem, - YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get()))); - } - @Override public Collection listDataConsistencyCheckAlgorithms() { Collection result = new LinkedList<>(); diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java index ad9e3ccb81b06..4af5b5f3b2715 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java @@ -106,7 +106,7 @@ private synchronized void executeInventoryTask() { private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) { jobItemContext.setStatus(jobStatus); - jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus); + jobManager.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus); } private synchronized void executeIncrementalTask() { diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java index 657b7b48ccd4d..14cdb71513a0b 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java @@ -151,7 +151,7 @@ private void executeInventoryTasks(final List jobItemContexts private void updateLocalAndRemoteJobItemStatus(final PipelineJobItemContext jobItemContext, final JobStatus jobStatus) { jobItemContext.setStatus(jobStatus); - jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus); + jobManager.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus); } private void executeIncrementalTasks(final List jobItemContexts) { diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java index 4ba4d05079db2..a60fcb44d5e5a 100644 --- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java +++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java @@ -46,7 +46,6 @@ import org.apache.shardingsphere.infra.database.core.type.DatabaseType; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; -import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import java.sql.Timestamp; import java.time.Duration; @@ -117,19 +116,6 @@ public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() { return true; } - @Override - public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) { - PipelineJobManager jobManager = new PipelineJobManager(this); - Optional jobItemProgress = jobManager.getJobItemProgress(jobId, shardingItem); - if (!jobItemProgress.isPresent()) { - log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem); - return; - } - jobItemProgress.get().setStatus(status); - PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem, - YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get()))); - } - /** * Start by parent job id. * diff --git a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java index d473001b873a7..50ce83843f1db 100644 --- a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java +++ b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java @@ -136,7 +136,7 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId); if (!offsetInfo.isTargetSchemaTableCreated()) { jobItemContext.setStatus(JobStatus.PREPARING); - jobAPI.updateJobItemStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING); + jobManager.updateJobItemStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING); prepareAndCheckTarget(jobItemContext); jobAPI.persistJobOffsetInfo(jobId, new JobOffsetInfo(true)); } diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java index a1a1c905dbd4d..6201a6cb50702 100644 --- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java +++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java @@ -213,7 +213,7 @@ void assertSwitchClusterConfigurationSucceed() { assertTrue(jobId.isPresent()); MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); jobManager.persistJobItemProgress(jobItemContext); - jobAPI.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK); + jobManager.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK); Map progress = jobAPI.getJobProgress(jobConfig); for (Entry entry : progress.entrySet()) { assertThat(entry.getValue().getStatus(), is(JobStatus.EXECUTE_INVENTORY_TASK)); @@ -246,7 +246,7 @@ void assertRenewJobStatus() { final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration(); MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig); jobManager.persistJobItemProgress(jobItemContext); - jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED); + jobManager.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED); Optional actual = jobManager.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem()); assertTrue(actual.isPresent()); assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));