Skip to content

Commit

Permalink
Refactor AbstractSimplePipelineJob (#29334)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Dec 8, 2023
1 parent bcc4a28 commit 322e8e7
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,16 @@ protected void prepare(final PipelineJobItemContext jobItemContext) {
protected abstract void doPrepare(PipelineJobItemContext jobItemContext) throws SQLException;

@Override
public Optional<PipelineTasksRunner> getTasksRunner(final int shardingItem) {
public final Optional<PipelineTasksRunner> getTasksRunner(final int shardingItem) {
return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
}

@Override
public Collection<Integer> getShardingItems() {
public final Collection<Integer> getShardingItems() {
return new ArrayList<>(tasksRunnerMap.keySet());
}

protected boolean addTasksRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
protected final boolean addTasksRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
log.warn("shardingItem {} tasks runner exists, ignore", shardingItem);
return false;
Expand All @@ -120,7 +120,7 @@ protected boolean addTasksRunner(final int shardingItem, final PipelineTasksRunn
}

@Override
public void stop() {
public final void stop() {
try {
innerStop();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,6 @@ protected AbstractSimplePipelineJob(final String jobId) {
super(jobId);
}

/**
* Build pipeline job item context.
*
* @param shardingContext sharding context
* @return pipeline job item context
*/
protected abstract PipelineJobItemContext buildPipelineJobItemContext(ShardingContext shardingContext);

protected abstract PipelineTasksRunner buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);

@Override
public void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
Expand All @@ -57,8 +47,7 @@ public void execute(final ShardingContext shardingContext) {
return;
}
try {
PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
execute0(jobItemContext);
execute(buildPipelineJobItemContext(shardingContext));
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
Expand All @@ -67,7 +56,7 @@ public void execute(final ShardingContext shardingContext) {
}
}

private void execute0(final PipelineJobItemContext jobItemContext) {
private void execute(final PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildPipelineTasksRunner(jobItemContext);
Expand All @@ -80,6 +69,10 @@ private void execute0(final PipelineJobItemContext jobItemContext) {
tasksRunner.start();
}

protected abstract PipelineJobItemContext buildPipelineJobItemContext(ShardingContext shardingContext);

protected abstract PipelineTasksRunner buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);

private void processFailed(final PipelineJobManager jobManager, final String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId, shardingItem, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
Expand All @@ -43,26 +40,28 @@
import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
Expand Down Expand Up @@ -115,8 +114,7 @@ public void execute(final ShardingContext shardingContext) {
return;
}
CDCJobItemContext jobItemContext = buildCDCJobItemContext(jobConfig, shardingItem);
PipelineTasksRunner tasksRunner = new CDCTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
if (!addTasksRunner(shardingItem, new CDCTasksRunner(jobItemContext))) {
continue;
}
jobItemContexts.add(jobItemContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ public CDCResponse streamData(final String requestId, final StreamDataRequestBod
public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
CDCJobConfiguration cdcJobConfig = jobConfigManager.getJobConfiguration(jobId);
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId));
if (PipelineJobRegistry.isExisting(jobId)) {
PipelineJobRegistry.stop(jobId);
}
PipelineJobRegistry.stop(jobId);
ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
jobAPI.start(jobId, new CDCSocketSink(channel, database, cdcJobConfig.getSchemaTableNames()));
connectionContext.setJobId(jobId);
Expand Down

0 comments on commit 322e8e7

Please sign in to comment.