diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 4cbf0ccfa688..8b4845b08d07 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; @@ -852,7 +853,7 @@ public String getType() */ private final ScheduledExecutorService reportingExec; /** - * Multi-threaded executor for managing communications with workers, including handling callbacks from worker RPCs. + * Multi-threaded executor for callbacks from worker RPCs. * Also serves as the connectExec for {@link #taskClient}. */ private final ListeningScheduledExecutorService workerExec; @@ -2314,7 +2315,7 @@ private void verifyAndMergeCheckpoints(final Collection taskGroupsToV final List> futures = new ArrayList<>(); for (TaskGroup taskGroup : taskGroupsToVerify) { //noinspection unchecked - futures.add((ListenableFuture) workerExec.submit(() -> verifyAndMergeCheckpoints(taskGroup))); + futures.add((ListenableFuture) verifyAndMergeCheckpoints(taskGroup)); } try { @@ -2327,16 +2328,11 @@ private void verifyAndMergeCheckpoints(final Collection taskGroupsToV } /** - * This method does two things - - * 1. Makes sure the checkpoints information in the taskGroup is consistent with that of the tasks, if not kill - * inconsistent tasks. - * 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly - * created tasks for the taskGroup start indexing from after the latest published sequences. + * Calls {@link SeekableStreamIndexTaskClient#getCheckpointsAsync(String, boolean)} on each task in the group, + * then calls {@link #verifyAndMergeCheckpoints(TaskGroup, List, List)} as a callback in {@link #workerExec}. */ - private void verifyAndMergeCheckpoints(final TaskGroup taskGroup) + private ListenableFuture verifyAndMergeCheckpoints(final TaskGroup taskGroup) { - final int groupId = taskGroup.groupId; - final List>>> taskSequences = new ArrayList<>(); final List>>> futures = new ArrayList<>(); final List taskIds = new ArrayList<>(); @@ -2350,30 +2346,48 @@ private void verifyAndMergeCheckpoints(final TaskGroup taskGroup) taskIds.add(taskId); } - try { - List>>> futuresResult = - coalesceAndAwait(futures); - - for (int i = 0; i < futuresResult.size(); i++) { - final Either>> futureResult = - futuresResult.get(i); - final String taskId = taskIds.get(i); - if (futureResult.isError()) { - final Throwable e = new RuntimeException(futureResult.error()); - stateManager.recordThrowableEvent(e); - log.error(e, "Problem while getting checkpoints for task [%s], killing the task", taskId); - killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass()); - taskGroup.tasks.remove(taskId); - } else if (futureResult.valueOrThrow().isEmpty()) { - log.warn("Ignoring task [%s], as probably it is not started running yet", taskId); - } else { - taskSequences.add(new Pair<>(taskId, futureResult.valueOrThrow())); - } + return Futures.transform( + FutureUtils.coalesce(futures), + futuresResult -> { + verifyAndMergeCheckpoints(taskGroup, taskIds, futuresResult); + return null; + }, + workerExec + ); + } + + /** + * This method does two things in {@link #workerExec} - + * 1. Makes sure the checkpoints information in the taskGroup is consistent with that of the tasks, if not kill + * inconsistent tasks. + * 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly + * created tasks for the taskGroup start indexing from after the latest published sequences. + */ + private void verifyAndMergeCheckpoints( + final TaskGroup taskGroup, + final List taskIds, + final List>>> checkpointResults + ) + { + final int groupId = taskGroup.groupId; + final List>>> taskSequences = new ArrayList<>(); + + for (int i = 0; i < checkpointResults.size(); i++) { + final Either>> checkpointResult = + checkpointResults.get(i); + final String taskId = taskIds.get(i); + if (checkpointResult.isError()) { + final Throwable e = new RuntimeException(checkpointResult.error()); + stateManager.recordThrowableEvent(e); + log.error(e, "Problem while getting checkpoints for task [%s], killing the task", taskId); + killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass()); + taskGroup.tasks.remove(taskId); + } else if (checkpointResult.valueOrThrow().isEmpty()) { + log.warn("Ignoring task [%s], as probably it is not started running yet", taskId); + } else { + taskSequences.add(new Pair<>(taskId, checkpointResult.valueOrThrow())); } } - catch (Exception e) { - throw new RuntimeException(e); - } final DataSourceMetadata rawDataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata( dataSource); @@ -3361,13 +3375,12 @@ public Map apply(@Nullable Object input) pauseFutures.add(taskClient.pauseAsync(taskId)); } - return Futures.transform( + return Futures.transformAsync( FutureUtils.coalesce(pauseFutures), - new Function>>, Map>() + new AsyncFunction>>, Map>() { - @Nullable @Override - public Map apply(List>> input) + public ListenableFuture> apply(List>> input) { // 3) Build a map of the highest sequence read by any task in the group for each partition final Map endOffsets = new HashMap<>(); @@ -3408,50 +3421,54 @@ public Map apply(List> results = coalesceAndAwait(setEndOffsetFutures); - for (int i = 0; i < results.size(); i++) { - if (results.get(i).isValue() && Boolean.valueOf(true).equals(results.get(i).valueOrThrow())) { - log.info("Successfully set endOffsets for task[%s] and resumed it", setEndOffsetTaskIds.get(i)); - } else { - String taskId = setEndOffsetTaskIds.get(i); - killTask(taskId, "Failed to set end offsets, killing task"); - taskGroup.tasks.remove(taskId); - } - } - } - catch (Exception e) { - log.error("An exception occurred while setting end offsets: [%s]", e.getMessage()); - throw new RuntimeException(e); } - if (taskGroup.tasks.isEmpty()) { - log.info("All tasks in taskGroup[%d] have failed, tasks will be re-created", taskGroup.groupId); - return null; + log.info( + "Setting endOffsets for tasks in taskGroup[%d] to [%s]", + taskGroup.groupId, endOffsets + ); + for (final String taskId : setEndOffsetTaskIds) { + setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize)); } - return endOffsets; + return Futures.transform( + FutureUtils.coalesce(setEndOffsetFutures), + results -> { + try { + for (int i = 0; i < results.size(); i++) { + if (results.get(i).isValue() && Boolean.valueOf(true).equals(results.get(i).valueOrThrow())) { + log.info("Successfully set endOffsets for task[%s] and resumed it", setEndOffsetTaskIds.get(i)); + } else { + String taskId = setEndOffsetTaskIds.get(i); + killTask(taskId, "Failed to set end offsets, killing task"); + taskGroup.tasks.remove(taskId); + } + } + } + catch (Exception e) { + log.error("An exception occurred while setting end offsets: [%s]", e.getMessage()); + throw new RuntimeException(e); + } + + if (taskGroup.tasks.isEmpty()) { + log.info("All tasks in taskGroup[%d] have failed, tasks will be re-created", taskGroup.groupId); + return null; + } + + return endOffsets; + }, + workerExec + ); } }, workerExec