Skip to content

Commit

Permalink
Remove AbstractPipelineJob.jobType and jobId (#29344)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 9, 2023
1 parent 85d38e4 commit 260635d
Show file tree
Hide file tree
Showing 10 changed files with 9 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@
@Slf4j
public abstract class AbstractInseparablePipelineJob<T extends PipelineJobItemContext> extends AbstractPipelineJob {

protected AbstractInseparablePipelineJob(final String jobId) {
super(jobId);
}

@Override
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
Expand Down Expand Up @@ -130,7 +126,6 @@ private void updateJobItemStatus(final T jobItemContext, final PipelineJobType j
}

private void executeIncrementalTasks(final PipelineJobType jobType, final Collection<T> jobItemContexts) {
log.info("Execute incremental tasks, jobId={}", getJobId());
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (T each : jobItemContexts) {
if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,16 @@

package org.apache.shardingsphere.data.pipeline.core.job;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.util.ArrayList;
Expand All @@ -49,23 +45,12 @@ public abstract class AbstractPipelineJob implements PipelineJob {

private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L;

@Getter
private final String jobId;

@Getter(AccessLevel.PROTECTED)
private final PipelineJobType jobType;

private final AtomicBoolean stopping = new AtomicBoolean(false);

private final AtomicReference<JobBootstrap> jobBootstrap = new AtomicReference<>();

private final Map<Integer, PipelineTasksRunner> tasksRunners = new ConcurrentHashMap<>();

protected AbstractPipelineJob(final String jobId) {
this.jobId = jobId;
jobType = TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType());
}

/**
* Is stopping.
*
Expand Down Expand Up @@ -107,16 +92,17 @@ protected final boolean addTasksRunner(final int shardingItem, final PipelineTas

@Override
public final void stop() {
Optional<String> jobId = tasksRunners.values().stream().findFirst().map(each -> each.getJobItemContext().getJobId());
try {
stopping.set(true);
log.info("Stop tasks runner, jobId={}", jobId);
tasksRunners.values().forEach(PipelineTasksRunner::stop);
awaitJobStopped(jobId);
jobId.ifPresent(this::awaitJobStopped);
if (null != jobBootstrap.get()) {
jobBootstrap.get().shutdown();
}
} finally {
PipelineJobProgressPersistService.remove(jobId);
jobId.ifPresent(PipelineJobProgressPersistService::remove);
tasksRunners.values().stream().map(each -> each.getJobItemContext().getJobProcessContext()).forEach(QuietlyCloser::close);
clean();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
@Slf4j
public abstract class AbstractSeparablePipelineJob<T extends PipelineJobItemContext> extends AbstractPipelineJob {

protected AbstractSeparablePipelineJob(final String jobId) {
super(jobId);
}

@Override
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public final class CDCJob extends AbstractInseparablePipelineJob<CDCJobItemConte

private final CDCJobPreparer jobPreparer;

public CDCJob(final String jobId, final PipelineSink sink) {
super(jobId);
public CDCJob(final PipelineSink sink) {
this.sink = sink;
jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
jobItemManager = new PipelineJobItemManager<>(new CDCJobType().getYamlJobItemProgressSwapper());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private static TransmissionJobItemProgress getTransmissionJobItemProgress(final
* @param sink sink
*/
public void start(final String jobId, final PipelineSink sink) {
CDCJob job = new CDCJob(jobId, sink);
CDCJob job = new CDCJob(sink);
PipelineJobRegistry.add(jobId, job);
enable(jobId);
JobConfigurationPOJO jobConfigPOJO = PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@
*/
public final class ConsistencyCheckJob extends AbstractSeparablePipelineJob<ConsistencyCheckJobItemContext> {

public ConsistencyCheckJob(final String jobId) {
super(jobId);
}

@Override
public ConsistencyCheckJobItemContext buildJobItemContext(final ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected void onDeleted(final JobConfiguration jobConfig) {

@Override
protected AbstractPipelineJob buildPipelineJob(final String jobId) {
return new ConsistencyCheckJob(jobId);
return new ConsistencyCheckJob();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ public final class MigrationJob extends AbstractSeparablePipelineJob<MigrationJo
// Shared by all sharding items
private final MigrationJobPreparer jobPreparer;

public MigrationJob(final String jobId) {
super(jobId);
public MigrationJob() {
jobItemManager = new PipelineJobItemManager<>(new MigrationJobType().getYamlJobItemProgressSwapper());
processConfigPersistService = new PipelineProcessConfigurationPersistService();
dataSourceManager = new DefaultPipelineDataSourceManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected void onDeleted(final JobConfiguration jobConfig) {

@Override
protected AbstractPipelineJob buildPipelineJob(final String jobId) {
return new MigrationJob(jobId);
return new MigrationJob();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void assertBuildPipelineJobItemContext() {
Map<String, Object> expectTableCheckPosition = Collections.singletonMap("t_order", 100);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId, 0,
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob(checkJobId);
ConsistencyCheckJob consistencyCheckJob = new ConsistencyCheckJob();
ConsistencyCheckJobItemContext actual = consistencyCheckJob.buildJobItemContext(
new ShardingContext(checkJobId, "", 1, YamlEngine.marshal(createYamlConsistencyCheckJobConfiguration(checkJobId)), 0, ""));
assertThat(actual.getProgressContext().getSourceTableCheckPositions(), is(expectTableCheckPosition));
Expand Down

0 comments on commit 260635d

Please sign in to comment.