Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move updateJobItemStatus method from PipelineJobAPI to PipelineJobManager #29076

Merged
merged 1 commit into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ public interface PipelineJobItemProgress {
* @return job status
*/
JobStatus getStatus();

/**
* Set status.
*
* @param jobStatus job status
*/
void setStatus(JobStatus jobStatus);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.data.pipeline.core.job.service;

import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
Expand Down Expand Up @@ -76,15 +75,6 @@ default Optional<String> getToBeStoppedPreviousJobType() {
return Optional.empty();
}

/**
* Update job item status.
*
* @param jobId job id
* @param shardingItem sharding item
* @param status status
*/
void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);

/**
* Get pipeline job class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,24 @@ public void persistJobItemProgress(final PipelineJobItemContext jobItemContext)
.persistJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}

/**
* Update job item status.
*
* @param jobId job id
* @param shardingItem sharding item
* @param status status
*/
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
Optional<PipelineJobItemProgress> jobItemProgress = getJobItemProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem,
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}

/**
* Update job item progress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,6 @@ public JobOffsetInfo getJobOffsetInfo(final String jobId) {
return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
}

@Override
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
PipelineJobManager jobManager = new PipelineJobManager(this);
Optional<InventoryIncrementalJobItemProgress> jobItemProgress = jobManager.getJobItemProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
return;
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem,
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}

@Override
public Collection<DataConsistencyCheckAlgorithmInfo> listDataConsistencyCheckAlgorithms() {
Collection<DataConsistencyCheckAlgorithmInfo> result = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private synchronized void executeInventoryTask() {

private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
jobManager.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
}

private synchronized void executeIncrementalTask() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private void executeInventoryTasks(final List<CDCJobItemContext> jobItemContexts

private void updateLocalAndRemoteJobItemStatus(final PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
jobManager.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
}

private void executeIncrementalTasks(final List<CDCJobItemContext> jobItemContexts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;

import java.sql.Timestamp;
import java.time.Duration;
Expand Down Expand Up @@ -117,19 +116,6 @@ public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
return true;
}

@Override
public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) {
PipelineJobManager jobManager = new PipelineJobManager(this);
Optional<ConsistencyCheckJobItemProgress> jobItemProgress = jobManager.getJobItemProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId, shardingItem,
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}

/**
* Start by parent job id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private void prepareAndCheckTargetWithLock(final MigrationJobItemContext jobItem
JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
jobAPI.updateJobItemStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
jobManager.updateJobItemStatus(jobId, jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
jobAPI.persistJobOffsetInfo(jobId, new JobOffsetInfo(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void assertSwitchClusterConfigurationSucceed() {
assertTrue(jobId.isPresent());
MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
jobManager.persistJobItemProgress(jobItemContext);
jobAPI.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
jobManager.updateJobItemStatus(jobId.get(), jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
Map<Integer, InventoryIncrementalJobItemProgress> progress = jobAPI.getJobProgress(jobConfig);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : progress.entrySet()) {
assertThat(entry.getValue().getStatus(), is(JobStatus.EXECUTE_INVENTORY_TASK));
Expand Down Expand Up @@ -246,7 +246,7 @@ void assertRenewJobStatus() {
final MigrationJobConfiguration jobConfig = JobConfigurationBuilder.createJobConfiguration();
MigrationJobItemContext jobItemContext = PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
jobManager.persistJobItemProgress(jobItemContext);
jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED);
jobManager.updateJobItemStatus(jobConfig.getJobId(), 0, JobStatus.FINISHED);
Optional<InventoryIncrementalJobItemProgress> actual = jobManager.getJobItemProgress(jobItemContext.getJobId(), jobItemContext.getShardingItem());
assertTrue(actual.isPresent());
assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));
Expand Down