Skip to content

Commit

Permalink
SeekableStreamSupervisor: Don't await task futures in workerExec. (#1…
Browse files Browse the repository at this point in the history
…7403)

Following #17394, workerExec can get deadlocked with itself, because it
waits for task futures and is also used as the connectExec for the task
client. To fix this, we need to never await task futures in the workerExec.

There are two specific changes: in "verifyAndMergeCheckpoints" and
"checkpointTaskGroup", two "coalesceAndAwait" calls that formerly occurred
in workerExec are replaced with Futures.transform (using a callback in
workerExec).

Because this adjustment removes a source of blocking, it may also improve
supervisor responsiveness for high task counts. This is not the primary
goal, however. The primary goal is to fix the bug introduced by #17394.
  • Loading branch information
gianm authored Oct 24, 2024
1 parent 7e8671c commit c4b513e
Showing 1 changed file with 90 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
Expand Down Expand Up @@ -852,7 +853,7 @@ public String getType()
*/
private final ScheduledExecutorService reportingExec;
/**
* Multi-threaded executor for managing communications with workers, including handling callbacks from worker RPCs.
* Multi-threaded executor for callbacks from worker RPCs.
* Also serves as the connectExec for {@link #taskClient}.
*/
private final ListeningScheduledExecutorService workerExec;
Expand Down Expand Up @@ -2314,7 +2315,7 @@ private void verifyAndMergeCheckpoints(final Collection<TaskGroup> taskGroupsToV
final List<ListenableFuture<Object>> futures = new ArrayList<>();
for (TaskGroup taskGroup : taskGroupsToVerify) {
//noinspection unchecked
futures.add((ListenableFuture<Object>) workerExec.submit(() -> verifyAndMergeCheckpoints(taskGroup)));
futures.add((ListenableFuture<Object>) verifyAndMergeCheckpoints(taskGroup));
}

try {
Expand All @@ -2327,16 +2328,11 @@ private void verifyAndMergeCheckpoints(final Collection<TaskGroup> taskGroupsToV
}

/**
* This method does two things -
* 1. Makes sure the checkpoints information in the taskGroup is consistent with that of the tasks, if not kill
* inconsistent tasks.
* 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly
* created tasks for the taskGroup start indexing from after the latest published sequences.
* Calls {@link SeekableStreamIndexTaskClient#getCheckpointsAsync(String, boolean)} on each task in the group,
* then calls {@link #verifyAndMergeCheckpoints(TaskGroup, List, List)} as a callback in {@link #workerExec}.
*/
private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
private ListenableFuture<?> verifyAndMergeCheckpoints(final TaskGroup taskGroup)
{
final int groupId = taskGroup.groupId;
final List<Pair<String, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>>> taskSequences = new ArrayList<>();
final List<ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>>> futures = new ArrayList<>();
final List<String> taskIds = new ArrayList<>();

Expand All @@ -2350,30 +2346,48 @@ private void verifyAndMergeCheckpoints(final TaskGroup taskGroup)
taskIds.add(taskId);
}

try {
List<Either<Throwable, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>>> futuresResult =
coalesceAndAwait(futures);

for (int i = 0; i < futuresResult.size(); i++) {
final Either<Throwable, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> futureResult =
futuresResult.get(i);
final String taskId = taskIds.get(i);
if (futureResult.isError()) {
final Throwable e = new RuntimeException(futureResult.error());
stateManager.recordThrowableEvent(e);
log.error(e, "Problem while getting checkpoints for task [%s], killing the task", taskId);
killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass());
taskGroup.tasks.remove(taskId);
} else if (futureResult.valueOrThrow().isEmpty()) {
log.warn("Ignoring task [%s], as probably it is not started running yet", taskId);
} else {
taskSequences.add(new Pair<>(taskId, futureResult.valueOrThrow()));
}
return Futures.transform(
FutureUtils.coalesce(futures),
futuresResult -> {
verifyAndMergeCheckpoints(taskGroup, taskIds, futuresResult);
return null;
},
workerExec
);
}

/**
* This method does two things in {@link #workerExec} -
* 1. Makes sure the checkpoints information in the taskGroup is consistent with that of the tasks, if not kill
* inconsistent tasks.
* 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly
* created tasks for the taskGroup start indexing from after the latest published sequences.
*/
private void verifyAndMergeCheckpoints(
final TaskGroup taskGroup,
final List<String> taskIds,
final List<Either<Throwable, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>>> checkpointResults
)
{
final int groupId = taskGroup.groupId;
final List<Pair<String, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>>> taskSequences = new ArrayList<>();

for (int i = 0; i < checkpointResults.size(); i++) {
final Either<Throwable, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> checkpointResult =
checkpointResults.get(i);
final String taskId = taskIds.get(i);
if (checkpointResult.isError()) {
final Throwable e = new RuntimeException(checkpointResult.error());
stateManager.recordThrowableEvent(e);
log.error(e, "Problem while getting checkpoints for task [%s], killing the task", taskId);
killTask(taskId, "Exception[%s] while getting checkpoints", e.getClass());
taskGroup.tasks.remove(taskId);
} else if (checkpointResult.valueOrThrow().isEmpty()) {
log.warn("Ignoring task [%s], as probably it is not started running yet", taskId);
} else {
taskSequences.add(new Pair<>(taskId, checkpointResult.valueOrThrow()));
}
}
catch (Exception e) {
throw new RuntimeException(e);
}

final DataSourceMetadata rawDataSourceMetadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(
dataSource);
Expand Down Expand Up @@ -3361,13 +3375,12 @@ public Map<PartitionIdType, SequenceOffsetType> apply(@Nullable Object input)
pauseFutures.add(taskClient.pauseAsync(taskId));
}

return Futures.transform(
return Futures.transformAsync(
FutureUtils.coalesce(pauseFutures),
new Function<List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>>, Map<PartitionIdType, SequenceOffsetType>>()
new AsyncFunction<List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>>, Map<PartitionIdType, SequenceOffsetType>>()
{
@Nullable
@Override
public Map<PartitionIdType, SequenceOffsetType> apply(List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>> input)
public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> apply(List<Either<Throwable, Map<PartitionIdType, SequenceOffsetType>>> input)
{
// 3) Build a map of the highest sequence read by any task in the group for each partition
final Map<PartitionIdType, SequenceOffsetType> endOffsets = new HashMap<>();
Expand Down Expand Up @@ -3408,50 +3421,54 @@ public Map<PartitionIdType, SequenceOffsetType> apply(List<Either<Throwable, Map

if (setEndOffsetTaskIds.isEmpty()) {
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
return Futures.immediateFuture(null);
}

try {

if (endOffsets.equals(taskGroup.checkpointSequences.lastEntry().getValue())) {
log.warn(
"Checkpoint[%s] is same as the start sequences[%s] of latest sequence for the taskGroup[%d].",
endOffsets,
taskGroup.checkpointSequences.lastEntry().getValue(),
taskGroup.groupId
);
}

log.info(
"Setting endOffsets for tasks in taskGroup[%d] to [%s]",
taskGroup.groupId, endOffsets
if (endOffsets.equals(taskGroup.checkpointSequences.lastEntry().getValue())) {
log.warn(
"Checkpoint[%s] is same as the start sequences[%s] of latest sequence for the taskGroup[%d].",
endOffsets,
taskGroup.checkpointSequences.lastEntry().getValue(),
taskGroup.groupId
);
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
}

List<Either<Throwable, Boolean>> results = coalesceAndAwait(setEndOffsetFutures);
for (int i = 0; i < results.size(); i++) {
if (results.get(i).isValue() && Boolean.valueOf(true).equals(results.get(i).valueOrThrow())) {
log.info("Successfully set endOffsets for task[%s] and resumed it", setEndOffsetTaskIds.get(i));
} else {
String taskId = setEndOffsetTaskIds.get(i);
killTask(taskId, "Failed to set end offsets, killing task");
taskGroup.tasks.remove(taskId);
}
}
}
catch (Exception e) {
log.error("An exception occurred while setting end offsets: [%s]", e.getMessage());
throw new RuntimeException(e);
}

if (taskGroup.tasks.isEmpty()) {
log.info("All tasks in taskGroup[%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
log.info(
"Setting endOffsets for tasks in taskGroup[%d] to [%s]",
taskGroup.groupId, endOffsets
);
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
}

return endOffsets;
return Futures.transform(
FutureUtils.coalesce(setEndOffsetFutures),
results -> {
try {
for (int i = 0; i < results.size(); i++) {
if (results.get(i).isValue() && Boolean.valueOf(true).equals(results.get(i).valueOrThrow())) {
log.info("Successfully set endOffsets for task[%s] and resumed it", setEndOffsetTaskIds.get(i));
} else {
String taskId = setEndOffsetTaskIds.get(i);
killTask(taskId, "Failed to set end offsets, killing task");
taskGroup.tasks.remove(taskId);
}
}
}
catch (Exception e) {
log.error("An exception occurred while setting end offsets: [%s]", e.getMessage());
throw new RuntimeException(e);
}

if (taskGroup.tasks.isEmpty()) {
log.info("All tasks in taskGroup[%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
}

return endOffsets;
},
workerExec
);
}
},
workerExec
Expand Down

0 comments on commit c4b513e

Please sign in to comment.