Skip to content

Commit

Permalink
Improve exception handling in AbstractPipelineJob and refactor proces…
Browse files Browse the repository at this point in the history
…sFailed (#28677)

* Improve exception handle at AbstractPipelineJob and refactor processFailed

* Move processFailed to AbstractSimplePipelineJob
  • Loading branch information
azexcy authored Oct 8, 2023
1 parent e4ec1dd commit 0e824cf
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
Expand All @@ -34,6 +33,7 @@
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -90,27 +90,14 @@ protected void prepare(final PipelineJobItemContext jobItemContext) {
try {
doPrepare(jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
processFailed(jobItemContext, ex);
throw ex;
// CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
processFailed(jobItemContext, ex);
throw new PipelineInternalException(ex);
}
}

protected abstract void doPrepare(PipelineJobItemContext jobItemContext) throws SQLException;

protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
jobAPI.stop(jobId);
}

@Override
public Optional<PipelineTasksRunner> getTasksRunner(final int shardingItem) {
return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,33 @@ public void execute(final ShardingContext shardingContext) {
log.info("stopping true, ignore");
return;
}
PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
try {
PipelineJobItemContext jobItemContext = buildPipelineJobItemContext(shardingContext);
execute0(jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
processFailed(jobId, shardingItem, ex);
throw ex;
}
}

private void execute0(final PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner = buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
getJobAPI().cleanJobItemErrorMessage(jobId, jobItemContext.getShardingItem());
getJobAPI().cleanJobItemErrorMessage(jobId, shardingItem);
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
tasksRunner.start();
}

private void processFailed(final String jobId, final int shardingItem, final Exception ex) {
log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
getJobAPI().updateJobItemErrorMessage(jobId, shardingItem, ex);
getJobAPI().stop(jobId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
import org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.util.Collection;
import java.util.LinkedList;
Expand Down Expand Up @@ -114,17 +114,15 @@ private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
for (PipelineJobItemContext each : jobItemContexts) {
processFailed(each, ex);
processFailed(each.getJobId(), each.getShardingItem(), ex);
}
throw ex;
}
}

@Override
protected void processFailed(final PipelineJobItemContext jobItemContext, final Exception ex) {
String jobId = jobItemContext.getJobId();
log.error("job prepare failed, {}-{}", jobId, jobItemContext.getShardingItem(), ex);
jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), jobItemContext.getShardingItem(), ex);
private void processFailed(final String jobId, final int shardingItem, final Exception ex) {
log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
jobAPI.updateJobItemErrorMessage(jobId, shardingItem, ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
Expand Down

0 comments on commit 0e824cf

Please sign in to comment.