Skip to content

Commit

Permalink
Merge AbstractPipelineJobAPIImpl and PipelineJobManager
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Nov 15, 2023
1 parent 660a1da commit 5173c7e
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,6 @@ public interface PipelineJobAPI extends TypedSPI {
*/
PipelineProcessContext buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);

/**
* Start disabled job.
*
* @param jobId job id
*/
void startDisabledJob(String jobId);

/**
* Get job configuration.
*
Expand All @@ -89,6 +82,33 @@ public interface PipelineJobAPI extends TypedSPI {
*/
PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO jobConfigPOJO);

/**
* Whether to ignore to start disabled job when job item progress is finished.
*
* @return ignore to start disabled job when job item progress is finished or not
*/
default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
return false;
}

/**
* Get to be start disabled next job type.
*
* @return to be start disabled next job type
*/
default Optional<String> getToBeStartDisabledNextJobType() {
return Optional.empty();
}

/**
* Get to be stopped previous job type.
*
* @return to be stopped previous job type
*/
default Optional<String> getToBeStoppedPreviousJobType() {
return Optional.empty();
}

/**
* Get pipeline job info.
*
Expand Down Expand Up @@ -129,15 +149,6 @@ public interface PipelineJobAPI extends TypedSPI {
*/
void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);

/**
* Get to be stopped previous job type.
*
* @return to be stopped previous job type
*/
default Optional<String> getToBeStoppedPreviousJobType() {
return Optional.empty();
}

/**
* Get pipeline job class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
Expand Down Expand Up @@ -73,6 +76,52 @@ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
return Optional.of(jobId);
}

/**
* Start disabled job.
*
* @param jobId job id
*/
public void startDisabledJob(final String jobId) {
if (pipelineJobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
Optional<? extends PipelineJobItemProgress> jobItemProgress = pipelineJobAPI.getJobItemProgress(jobId, 0);
if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
return;
}
}
startCurrentDisabledJob(jobId);
pipelineJobAPI.getToBeStartDisabledNextJobType().ifPresent(optional -> startNextDisabledJob(jobId, optional));

}

private void startCurrentDisabledJob(final String jobId) {
PipelineDistributedBarrier pipelineDistributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () -> new PipelineJobHasAlreadyStartedException(jobId));
jobConfigPOJO.setDisabled(false);
jobConfigPOJO.getProps().setProperty("start_time_millis", String.valueOf(System.currentTimeMillis()));
jobConfigPOJO.getProps().remove("stop_time");
jobConfigPOJO.getProps().remove("stop_time_millis");
jobConfigPOJO.getProps().setProperty("run_count", String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count", "0")) + 1));
String barrierEnablePath = PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
pipelineDistributedBarrier.register(barrierEnablePath, jobConfigPOJO.getShardingTotalCount());
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
pipelineDistributedBarrier.await(barrierEnablePath, 5L, TimeUnit.SECONDS);
}

private void startNextDisabledJob(final String jobId, final String toBeStartDisabledNextJobType) {
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class, toBeStartDisabledNextJobType)).startDisabledJob(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
log.warn("start related check job failed, check job id: {}, error: {}", optional, ex.getMessage());
}
});
}

/**
* Stop pipeline job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
* Abstract inventory incremental job API implementation.
*/
@Slf4j
public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPipelineJobAPIImpl implements InventoryIncrementalJobAPI {
public abstract class AbstractInventoryIncrementalJobAPIImpl implements InventoryIncrementalJobAPI {

private final PipelineProcessConfigurationPersistService processConfigPersistService = new PipelineProcessConfigurationPersistService();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.migration.distsql.handler.update;

import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
Expand All @@ -26,11 +27,11 @@
*/
public final class StartMigrationUpdater implements RALUpdater<StartMigrationStatement> {

private final MigrationJobAPI jobAPI = new MigrationJobAPI();
private final PipelineJobManager jobManager = new PipelineJobManager(new MigrationJobAPI());

@Override
public void executeUpdate(final String databaseName, final StartMigrationStatement sqlStatement) {
jobAPI.startDisabledJob(sqlStatement.getJobId());
jobManager.startDisabledJob(sqlStatement.getJobId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractPipelineJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
Expand Down Expand Up @@ -78,7 +77,7 @@
* Consistency check job API.
*/
@Slf4j
public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
public final class ConsistencyCheckJobAPI implements PipelineJobAPI {

private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

Expand Down Expand Up @@ -181,23 +180,13 @@ public void updateJobItemStatus(final String jobId, final int shardingItem, fina
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}

@Override
public void startDisabledJob(final String jobId) {
Optional<ConsistencyCheckJobItemProgress> jobItemProgress = getJobItemProgress(jobId, 0);
if (jobItemProgress.isPresent() && JobStatus.FINISHED == jobItemProgress.get().getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
return;
}
super.startDisabledJob(jobId);
}

/**
* Start by parent job id.
*
* @param parentJobId parent job id
*/
public void startByParentJobId(final String parentJobId) {
startDisabledJob(getLatestCheckJobId(parentJobId));
new PipelineJobManager(this).startDisabledJob(getLatestCheckJobId(parentJobId));
}

private String getLatestCheckJobId(final String parentJobId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
Expand All @@ -81,7 +80,6 @@
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.json.JsonUtils;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
Expand Down Expand Up @@ -295,17 +293,8 @@ public PipelineDataConsistencyChecker buildPipelineDataConsistencyChecker(final
}

@Override
public void startDisabledJob(final String jobId) {
super.startDisabledJob(jobId);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional -> {
try {
TypedSPILoader.getService(PipelineJobAPI.class, "CONSISTENCY_CHECK").startDisabledJob(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
log.warn("start related check job failed, check job id: {}, error: {}", optional, ex.getMessage());
}
});
public Optional<String> getToBeStartDisabledNextJobType() {
return Optional.of("CONSISTENCY_CHECK");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void assertStartOrStopById() {
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
jobManager.stop(jobId.get());
assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled());
jobAPI.startDisabledJob(jobId.get());
jobManager.startDisabledJob(jobId.get());
assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
}

Expand Down

0 comments on commit 5173c7e

Please sign in to comment.