Skip to content

Commit

Permalink
Move updateJobItemStatus method from PipelineJobAPI to PipelineJobMan…
Browse files Browse the repository at this point in the history
…ager (#29076)
  • Loading branch information
terrymanu authored Nov 18, 2023
1 parent 19c6551 commit c4f1ef1
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ public interface PipelineJobItemProgress {
* @return job status
*/
JobStatus getStatus();

/**
* Set status.
*
* @param jobStatus job status
*/
void setStatus(JobStatus jobStatus);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
Expand Down Expand Up @@ -76,15 +75,6 @@ default Optional<String> getToBeStoppedPreviousJobType() {
return Optional.empty();
}

/**
* Update job item status.
*
* @param jobId job id
* @param shardingItem sharding item
* @param status status
*/
void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);

/**
* Get pipeline job class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,24 @@ public void persistJobItemProgress(final PipelineJobItemContext jobItemContext)
.persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}

/**
* Update job item status.
*
* @param jobId job id
* @param shardingItem sharding item
* @param status status
*/
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
Optional<PipelineJobItemProgress> jobItemProgress = getJobItemProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem,
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}

/**
* Update job item progress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,6 @@ public JobOffsetInfo getJobOffsetInfo(final String jobId) {
return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
}

@Override
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
PipelineJobManager jobManager = new PipelineJobManager(this);
Optional<InventoryIncrementalJobItemProgress> jobItemProgress = jobManager.getJobItemProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
return;
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem,
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}

@Override
public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
Collection<DataConsistencyCheckAlgorithmInfo> result = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private synchronized void executeInventoryTask() {

private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
jobManager.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
}

private synchronized void executeIncrementalTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private void executeInventoryTasks(final List<CDCJobItemContext> jobItemContexts

private void updateLocalAndRemoteJobItemStatus(final PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
jobManager.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
}

private void executeIncrementalTasks(final List<CDCJobItemContext> jobItemContexts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.sql.Timestamp;
import java.time.Duration;
Expand Down Expand Up @@ -117,19 +116,6 @@ public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
return true;
}

@Override
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
PipelineJobManager jobManager = new PipelineJobManager(this);
Optional<ConsistencyCheckJobItemProgress> jobItemProgress = jobManager.getJobItemProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem,
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}

/**
* Start by parent job id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem
JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
jobAPI.updateJobItemStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
jobManager.updateJobItemStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
jobAPI.persistJobOffsetInfo(jobId, new JobOffsetInfo(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void assertSwitchClusterConfigurationSucceed() {
assertTrue(jobId.isPresent());
MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
jobManager.persistJobItemProgress(jobItemContext);
jobAPI.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
jobManager.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
Map<Integer, InventoryIncrementalJobItemProgress> progress = jobAPI.getJobProgress(jobConfig);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : progress.entrySet()) {
assertThat(entry.getValue().getStatus(), is(JobStatus.EXECUTE_INVENTORY_TASK));
Expand Down Expand Up @@ -246,7 +246,7 @@ void assertRenewJobStatus() {
final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
jobManager.persistJobItemProgress(jobItemContext);
jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED);
jobManager.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED);
Optional<InventoryIncrementalJobItemProgress> actual = jobManager.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
assertTrue(actual.isPresent());
assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));
Expand Down

0 comments on commit c4f1ef1

Please sign in to comment.