From ccb5cb531a85c78fb78dd7a89495ad5ba95f27c7 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Fri, 30 Aug 2024 19:32:51 +0800 Subject: [PATCH] Refactor PipelineJobRunnerManager (#32731) --- .../pipeline/core/job/AbstractSeparablePipelineJob.java | 6 +++--- .../core/job/engine/PipelineJobRunnerManager.java | 9 ++++----- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java index 3cfdc634b5c61..196034b3549bb 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java @@ -75,9 +75,9 @@ private TransmissionProcessContext createTransmissionProcessContext(final String public final void execute(final ShardingContext shardingContext) { String jobId = shardingContext.getJobName(); int shardingItem = shardingContext.getShardingItem(); - log.info("Execute job {}-{}", jobId, shardingItem); + log.info("Execute job {}-{}.", jobId, shardingItem); if (jobRunnerManager.isStopping()) { - log.info("Stopping true, ignore"); + log.info("Job is stopping, ignore."); return; } PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId); @@ -115,7 +115,7 @@ private boolean execute(final I jobItemContext) { String jobId = jobItemContext.getJobId(); PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId, shardingItem); prepare(jobItemContext); - log.info("Start tasks runner, jobId={}, shardingItem={}", jobId, shardingItem); + log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId, shardingItem); tasksRunner.start(); return true; } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java index a1083185d45b7..c5bb8bc0be5d1 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java @@ -33,7 +33,6 @@ import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap; import org.apache.shardingsphere.infra.util.close.QuietlyCloser; -import java.util.ArrayList; import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -50,6 +49,8 @@ public final class PipelineJobRunnerManager { private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L; + private final PipelineJobRunnerCleaner cleaner; + private final AtomicBoolean stopping = new AtomicBoolean(false); private final AtomicReference jobBootstrap = new AtomicReference<>(); @@ -59,8 +60,6 @@ public final class PipelineJobRunnerManager { @Getter private final PipelineDataSourceManager dataSourceManager = new PipelineDataSourceManager(); - private final PipelineJobRunnerCleaner cleaner; - public PipelineJobRunnerManager() { this(null); } @@ -99,7 +98,7 @@ public Optional getTasksRunner(final int shardingItem) { * @return sharding items */ public Collection getShardingItems() { - return new ArrayList<>(tasksRunners.keySet()); + return tasksRunners.keySet(); } /** @@ -111,7 +110,7 @@ public Collection getShardingItems() { */ public boolean addTasksRunner(final int shardingItem, final PipelineTasksRunner tasksRunner) { if (null != tasksRunners.putIfAbsent(shardingItem, tasksRunner)) { - log.warn("shardingItem {} tasks runner exists, ignore", shardingItem); + log.warn("Tasks runner on sharding item {} exists, ignore.", shardingItem); return false; } String jobId = tasksRunner.getJobItemContext().getJobId();