From c1d6328249dd8e5067bc1cabd30f00515cbabbdb Mon Sep 17 00:00:00 2001 From: Adithya Chakilam <35785271+adithyachakilam@users.noreply.github.com> Date: Tue, 19 Nov 2024 14:49:20 -0600 Subject: [PATCH] StreamingTaskRunner: Close the rejection period updater executor service (#17490) --- .../seekablestream/SeekableStreamIndexTaskRunner.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 42dcef39bc85..a2db12d005e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -128,6 +128,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -249,6 +250,7 @@ public enum Status private volatile DateTime minMessageTime; private volatile DateTime maxMessageTime; + private final ScheduledExecutorService rejectionPeriodUpdaterExec; public SeekableStreamIndexTaskRunner( final SeekableStreamIndexTask task, @@ -273,15 +275,15 @@ public SeekableStreamIndexTaskRunner( minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN); maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX); + rejectionPeriodUpdaterExec = Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d"); if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) { - Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d") + rejectionPeriodUpdaterExec .scheduleWithFixedDelay( this::refreshMinMaxMessageTime, ioConfig.getRefreshRejectionPeriodsInMinutes(), ioConfig.getRefreshRejectionPeriodsInMinutes(), - TimeUnit.MINUTES - ); + TimeUnit.MINUTES); } resetNextCheckpointTime(); } @@ -940,6 +942,7 @@ public void onFailure(Throwable t) toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); toolbox.getDataSegmentServerAnnouncer().unannounce(); } + rejectionPeriodUpdaterExec.shutdown(); } catch (Throwable e) { if (caughtExceptionOuter != null) {