Skip to content

Commit

Permalink
Improve job progress persisting (#30441)
Browse files Browse the repository at this point in the history
* Improve unhandled event count measuring in job progress persist service

* Try to persist job progress immediately when job item execute successfully
  • Loading branch information
sandynz authored Mar 9, 2024
1 parent d77494b commit ee48bc1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
Expand Down Expand Up @@ -87,6 +88,9 @@ public final void execute(final ShardingContext shardingContext) {
boolean started = false;
try {
started = execute(buildJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext));
if (started) {
PipelineJobProgressPersistService.persistNow(jobId, shardingItem);
}
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.RequiredArgsConstructor;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicLong;

/**
* Pipeline job progress persist context.
Expand All @@ -34,9 +34,7 @@ public final class PipelineJobProgressPersistContext {

private final int shardingItem;

private final AtomicBoolean hasNewEvents = new AtomicBoolean(false);

private final AtomicReference<Long> beforePersistingProgressMillis = new AtomicReference<>(null);
private final AtomicLong unhandledEventCount = new AtomicLong(0);

private final AtomicBoolean firstExceptionLogged = new AtomicBoolean(false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;

Expand Down Expand Up @@ -85,7 +86,7 @@ public static void notifyPersist(final String jobId, final int shardingItem) {
}

private static void notifyPersist(final PipelineJobProgressPersistContext persistContext) {
persistContext.getHasNewEvents().set(true);
persistContext.getUnhandledEventCount().incrementAndGet();
}

private static Optional<PipelineJobProgressPersistContext> getPersistContext(final String jobId, final int shardingItem) {
Expand All @@ -101,8 +102,8 @@ private static Optional<PipelineJobProgressPersistContext> getPersistContext(fin
*/
public static void persistNow(final String jobId, final int shardingItem) {
getPersistContext(jobId, shardingItem).ifPresent(persistContext -> {
if (null == persistContext.getBeforePersistingProgressMillis().get()) {
log.warn("Force persisting progress is not permitted since there is no previous persisting, jobId={}, shardingItem={}", jobId, shardingItem);
if (persistContext.getUnhandledEventCount().get() <= 0) {
log.info("Force persisting progress is not permitted since there is no unhandled event, jobId={}, shardingItem={}", jobId, shardingItem);
return;
}
notifyPersist(persistContext);
Expand Down Expand Up @@ -135,23 +136,19 @@ private static synchronized void persist(final String jobId, final int shardingI
}

private static void persist0(final String jobId, final int shardingItem, final PipelineJobProgressPersistContext persistContext) {
Long beforePersistingProgressMillis = persistContext.getBeforePersistingProgressMillis().get();
if ((null == beforePersistingProgressMillis || System.currentTimeMillis() - beforePersistingProgressMillis < TimeUnit.SECONDS.toMillis(DELAY_SECONDS))
&& !persistContext.getHasNewEvents().get()) {
long currentUnhandledEventCount = persistContext.getUnhandledEventCount().get();
ShardingSpherePreconditions.checkState(currentUnhandledEventCount >= 0, () -> new IllegalStateException("Current unhandled event count must be greater than or equal to 0"));
if (0 == currentUnhandledEventCount) {
return;
}
Optional<PipelineJobItemContext> jobItemContext = PipelineJobRegistry.getItemContext(jobId, shardingItem);
if (!jobItemContext.isPresent()) {
return;
}
if (null == beforePersistingProgressMillis) {
persistContext.getBeforePersistingProgressMillis().set(System.currentTimeMillis());
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
new PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobId).getType()).getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
persistContext.getUnhandledEventCount().addAndGet(-currentUnhandledEventCount);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost {} ms", jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
}
Expand Down

0 comments on commit ee48bc1

Please sign in to comment.