Skip to content

Commit

Permalink
Refactor PipelineJobRunnerManager (apache#32731)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Aug 30, 2024
1 parent 6f0d325 commit ccb5cb5
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ private TransmissionProcessContext createTransmissionProcessContext(final String
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
log.info("Execute job {}-{}.", jobId, shardingItem);
if (jobRunnerManager.isStopping()) {
log.info("Stopping true, ignore");
log.info("Job is stopping, ignore.");
return;
}
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
Expand Down Expand Up @@ -115,7 +115,7 @@ private boolean execute(final I jobItemContext) {
String jobId = jobItemContext.getJobId();
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId, shardingItem);
prepare(jobItemContext);
log.info("Start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem);
log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId, shardingItem);
tasksRunner.start();
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
Expand All @@ -50,6 +49,8 @@ public final class PipelineJobRunnerManager {

private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L;

private final PipelineJobRunnerCleaner cleaner;

private final AtomicBoolean stopping = new AtomicBoolean(false);

private final AtomicReference<JobBootstrap> jobBootstrap = new AtomicReference<>();
Expand All @@ -59,8 +60,6 @@ public final class PipelineJobRunnerManager {
@Getter
private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager();

private final PipelineJobRunnerCleaner cleaner;

public PipelineJobRunnerManager() {
this(null);
}
Expand Down Expand Up @@ -99,7 +98,7 @@ public Optional<PipelineTasksRunner> getTasksRunner(final int shardingItem) {
* @return sharding items
*/
public Collection<Integer> getShardingItems() {
return new ArrayList<>(tasksRunners.keySet());
return tasksRunners.keySet();
}

/**
Expand All @@ -111,7 +110,7 @@ public Collection<Integer> getShardingItems() {
*/
public boolean addTasksRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) {
if (null != tasksRunners.putIfAbsent(shardingItem, tasksRunner)) {
log.warn("shardingItem {} tasks runner exists, ignore", shardingItem);
log.warn("Tasks runner on sharding item {} exists, ignore.", shardingItem);
return false;
}
String jobId = tasksRunner.getJobItemContext().getJobId();
Expand Down

0 comments on commit ccb5cb5

Please sign in to comment.