diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java index 855bd8ed7173b..03fc042ca74a8 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java @@ -29,6 +29,7 @@ import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress; import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration; import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils; +import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType; @@ -87,6 +88,9 @@ public final void execute(final ShardingContext shardingContext) { boolean started = false; try { started = execute(buildJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext)); + if (started) { + PipelineJobProgressPersistService.persistNow(jobId, shardingItem); + } // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java index d91b04a9102e1..beafba57ba3bd 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistContext.java @@ -21,7 +21,7 @@ import lombok.RequiredArgsConstructor; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLong; /** * Pipeline job progress persist context. @@ -34,9 +34,7 @@ public final class PipelineJobProgressPersistContext { private final int shardingItem; - private final AtomicBoolean hasNewEvents = new AtomicBoolean(false); - - private final AtomicReference beforePersistingProgressMillis = new AtomicReference<>(null); + private final AtomicLong unhandledEventCount = new AtomicLong(0); private final AtomicBoolean firstExceptionLogged = new AtomicBoolean(false); } diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java index f40b1a6710a22..8c47d54074933 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java @@ -21,10 +21,11 @@ import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext; -import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType; import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry; import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils; import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager; +import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType; +import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; @@ -85,7 +86,7 @@ public static void notifyPersist(final String jobId, final int shardingItem) { } private static void notifyPersist(final PipelineJobProgressPersistContext persistContext) { - persistContext.getHasNewEvents().set(true); + persistContext.getUnhandledEventCount().incrementAndGet(); } private static Optional getPersistContext(final String jobId, final int shardingItem) { @@ -101,8 +102,8 @@ private static Optional getPersistContext(fin */ public static void persistNow(final String jobId, final int shardingItem) { getPersistContext(jobId, shardingItem).ifPresent(persistContext -> { - if (null == persistContext.getBeforePersistingProgressMillis().get()) { - log.warn("Force persisting progress is not permitted since there is no previous persisting, jobId={}, shardingItem={}", jobId, shardingItem); + if (persistContext.getUnhandledEventCount().get() <= 0) { + log.info("Force persisting progress is not permitted since there is no unhandled event, jobId={}, shardingItem={}", jobId, shardingItem); return; } notifyPersist(persistContext); @@ -135,23 +136,19 @@ private static synchronized void persist(final String jobId, final int shardingI } private static void persist0(final String jobId, final int shardingItem, final PipelineJobProgressPersistContext persistContext) { - Long beforePersistingProgressMillis = persistContext.getBeforePersistingProgressMillis().get(); - if ((null == beforePersistingProgressMillis || System.currentTimeMillis() - beforePersistingProgressMillis < TimeUnit.SECONDS.toMillis(DELAY_SECONDS)) - && !persistContext.getHasNewEvents().get()) { + long currentUnhandledEventCount = persistContext.getUnhandledEventCount().get(); + ShardingSpherePreconditions.checkState(currentUnhandledEventCount >= 0, () -> new IllegalStateException("Current unhandled event count must be greater than or equal to 0")); + if (0 == currentUnhandledEventCount) { return; } Optional jobItemContext = PipelineJobRegistry.getItemContext(jobId, shardingItem); if (!jobItemContext.isPresent()) { return; } - if (null == beforePersistingProgressMillis) { - persistContext.getBeforePersistingProgressMillis().set(System.currentTimeMillis()); - } - persistContext.getHasNewEvents().set(false); long startTimeMillis = System.currentTimeMillis(); new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, PipelineJobIdUtils.parseJobType(jobId).getType()).getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get()); - persistContext.getBeforePersistingProgressMillis().set(null); + persistContext.getUnhandledEventCount().addAndGet(-currentUnhandledEventCount); if (6 == ThreadLocalRandom.current().nextInt(100)) { log.info("persist, jobId={}, shardingItem={}, cost {} ms", jobId, shardingItem, System.currentTimeMillis() - startTimeMillis); }