Skip to content

Commit

Permalink
Fix log typos, clean up some kill messages in SeekableStreamSupervisor (
Browse files Browse the repository at this point in the history
apache#15424)

Changes:
- Fix log `Got end of partition marker for partition [%s] from task [%s] in discoverTasks`
by fixing order of args
- Simplify in-line classes by using lambda
- Update kill task message from `Task [%s] failed to respond to [set end offsets]
 in a timely manner, killing task` to `Failed to set end offsets, killing task`
- Clean up tests
  • Loading branch information
kfaraz authored Nov 24, 2023
1 parent 3113e7b commit 67c7b62
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static final String TOPIC_PREFIX = "testTopic";
private static final String DATASOURCE = "testDS";
private static final int NUM_PARTITIONS = 3;
private static final int TEST_CHAT_THREADS = 3;
private static final long TEST_CHAT_RETRIES = 9L;
private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
Expand Down Expand Up @@ -2708,8 +2707,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2);
taskQueue.shutdown(
EasyMock.contains("sequenceName-0"),
EasyMock.eq("Task [%s] failed to respond to [set end offsets] in a timely manner, killing task"),
EasyMock.contains("sequenceName-0")
EasyMock.eq("Failed to set end offsets, killing task")
);
EasyMock.expectLastCall().times(2);
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
false
);
private static final String DATASOURCE = "testDS";
private static final int TEST_CHAT_THREADS = 3;
private static final long TEST_CHAT_RETRIES = 9L;
private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
Expand Down Expand Up @@ -1094,7 +1093,7 @@ public void testKillBadPartitionAssignment() throws Exception


taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
taskQueue.shutdown("id4", "Task [%s] failed to return status, killing task", "id4");
taskQueue.shutdown("id4", "Task[%s] failed to return status, killing task", "id4");
replayAll();

supervisor.start();
Expand Down Expand Up @@ -2386,8 +2385,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2);
taskQueue.shutdown(
EasyMock.contains("sequenceName-0"),
EasyMock.eq("Task [%s] failed to respond to [set end offsets] in a timely manner, killing task"),
EasyMock.contains("sequenceName-0")
EasyMock.eq("Failed to set end offsets, killing task")
);
EasyMock.expectLastCall().times(2);
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,24 +163,10 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy

private static final EmittingLogger log = new EmittingLogger(SeekableStreamSupervisor.class);

private static final Comparator<ParseExceptionReport> PARSE_EXCEPTION_REPORT_COMPARATOR =
new Comparator<ParseExceptionReport>()
{
@Override
public int compare(ParseExceptionReport o1, ParseExceptionReport o2)
{
int timeCompare = Long.compare(o1.getTimeOfExceptionMillis(), o2.getTimeOfExceptionMillis());
if (timeCompare != 0) {
return timeCompare;
}
int errorTypeCompare = StringComparators.LEXICOGRAPHIC.compare(o1.getErrorType(), o2.getErrorType());
if (errorTypeCompare != 0) {
return errorTypeCompare;
}

return StringComparators.LEXICOGRAPHIC.compare(o1.getInput(), o2.getInput());
}
};
private static final Comparator<ParseExceptionReport> PARSE_EXCEPTION_REPORT_COMPARATOR
= Comparator.comparingLong(ParseExceptionReport::getTimeOfExceptionMillis)
.thenComparing(ParseExceptionReport::getErrorType, StringComparators.LEXICOGRAPHIC)
.thenComparing(ParseExceptionReport::getInput, StringComparators.LEXICOGRAPHIC);

// Internal data structures
// --------------------------------------------------------
Expand Down Expand Up @@ -1890,7 +1876,6 @@ public void resetOffsetsInternal(@Nonnull final DataSourceMetadata dataSourceMet

}


private void killTask(final String id, String reasonFormat, Object... args)
{
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
Expand Down Expand Up @@ -1978,7 +1963,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException
if (!inactivePartitionsInTask.isEmpty()) {
killTaskWithSuccess(
taskId,
"Task [%s] with partition set [%s] has inactive partitions [%s], stopping task.",
"Task[%s] with partition set[%s] has inactive partitions[%s], stopping task.",
taskId,
taskPartitions,
inactivePartitionsInTask
Expand Down Expand Up @@ -2046,9 +2031,8 @@ public Boolean apply(Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdT
SequenceOffsetType sequence = entry.getValue();
if (sequence.equals(getEndOfPartitionMarker())) {
log.info(
"Got end of partition marker for partition [%s] from task [%s] in discoverTasks, clearing partition offset to refetch from metadata..",
taskId,
partition
"Got end-of-partition(EOS) marker for partition[%s] from task[%s] in discoverTasks, clearing partition offset to refetch from metadata.",
partition, taskId
);
endOffsetsAreInvalid = true;
partitionOffsets.put(partition, getNotSetMarker());
Expand Down Expand Up @@ -2080,7 +2064,7 @@ public Boolean apply(Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdT
.keySet()) {
if (!taskGroupId.equals(getTaskGroupIdForPartition(partition))) {
log.warn(
"Stopping task [%s] which does not match the expected partition allocation",
"Stopping task[%s] as it does not match the current partition allocation.",
taskId
);

Expand All @@ -2091,10 +2075,7 @@ public Boolean apply(Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdT
// make sure the task's io and tuning configs match with the supervisor config
// if it is current then only create corresponding taskGroup if it does not exist
if (!isTaskCurrent(taskGroupId, taskId, activeTaskMap)) {
log.info(
"Stopping task [%s] which does not match the expected parameters and ingestion spec",
taskId
);
log.info("Stopping task[%s] as it does not match the current supervisor spec.", taskId);

// Returning false triggers a call to stopTask.
return false;
Expand Down Expand Up @@ -2127,8 +2108,7 @@ public Boolean apply(Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdT
if (prevTaskData != null) {
throw new ISE(
"taskGroup[%s] already exists for new task[%s]",
prevTaskData,
taskId
prevTaskData, taskId
);
}
verifySameSequenceNameForAllTasksInGroup(taskGroupId);
Expand All @@ -2138,7 +2118,7 @@ public Boolean apply(Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdT
}
catch (Throwable t) {
stateManager.recordThrowableEvent(t);
log.error(t, "Something bad while discovering task [%s]", taskId);
log.error(t, "An error occurred while discovering task[%s]", taskId);
return null;
}
}
Expand All @@ -2155,13 +2135,13 @@ public Boolean apply(Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdT
for (int i = 0; i < results.size(); i++) {
String taskId = futureTaskIds.get(i);
if (results.get(i).isError() || results.get(i).valueOrThrow() == null) {
killTask(taskId, "Task [%s] failed to return status, killing task", taskId);
killTask(taskId, "Task[%s] failed to return status, killing task", taskId);
} else if (Boolean.valueOf(false).equals(results.get(i).valueOrThrow())) {
// "return false" above means that we want to stop the task.
stopFutures.add(stopTask(taskId, false));
}
}
log.debug("Found [%d] seekablestream indexing tasks for dataSource [%s]", taskCount, dataSource);
log.debug("Found [%d] seekablestream indexing tasks for datasource[%s]", taskCount, dataSource);

if (!stopFutures.isEmpty()) {
coalesceAndAwait(stopFutures);
Expand Down Expand Up @@ -2237,34 +2217,25 @@ private void resumeAllActivelyReadingTasks()
}
}

final String killMsg =
"Killing forcefully as task could not be resumed in the first supervisor run after Overlord change.";
for (Map.Entry<String, ListenableFuture<Boolean>> entry : tasksToResume.entrySet()) {
String taskId = entry.getKey();
ListenableFuture<Boolean> future = entry.getValue();
future.addListener(
new Runnable()
{
@Override
public void run()
{
try {
if (entry.getValue().get()) {
log.info("Resumed task [%s] in first supervisor run.", taskId);
} else {
log.warn("Failed to resume task [%s] in first supervisor run.", taskId);
killTask(
taskId,
"Killing forcefully as task could not be resumed in the first supervisor run after Overlord change."
);
}
}
catch (Exception e) {
log.warn(e, "Failed to resume task [%s] in first supervisor run.", taskId);
killTask(
taskId,
"Killing forcefully as task could not be resumed in the first supervisor run after Overlord change."
);
() -> {
try {
if (entry.getValue().get()) {
log.info("Resumed task[%s] in first supervisor run.", taskId);
} else {
log.warn("Failed to resume task[%s] in first supervisor run.", taskId);
killTask(taskId, killMsg);
}
}
catch (Exception e) {
log.warn(e, "Failed to resume task[%s] in first supervisor run.", taskId);
killTask(taskId, killMsg);
}
},
workerExec
);
Expand Down Expand Up @@ -3190,7 +3161,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
for (Entry<PartitionIdType, SequenceOffsetType> entry : endOffsets.entrySet()) {
if (entry.getValue().equals(getEndOfPartitionMarker())) {
log.info(
"Got end of partition marker for partition [%s] in checkTaskDuration, not updating partition offset.",
"Got end-of-partition(EOS) marker for partition[%s] in checkTaskDuration, not updating partition offset.",
entry.getKey()
);
endOffsetsAreInvalid = true;
Expand All @@ -3213,7 +3184,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
for (String id : group.taskIds()) {
killTask(
id,
"All tasks in group [%s] failed to transition to publishing state",
"All tasks in group[%s] failed to transition to publishing state",
groupId
);
}
Expand Down Expand Up @@ -3343,9 +3314,8 @@ public Map<PartitionIdType, SequenceOffsetType> apply(List<Either<Throwable, Map
}

log.info(
"Setting endOffsets for tasks in taskGroup [%d] to %s",
taskGroup.groupId,
endOffsets
"Setting endOffsets for tasks in taskGroup[%d] to [%s]",
taskGroup.groupId, endOffsets
);
for (final String taskId : setEndOffsetTaskIds) {
setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
Expand All @@ -3357,22 +3327,18 @@ public Map<PartitionIdType, SequenceOffsetType> apply(List<Either<Throwable, Map
log.info("Successfully set endOffsets for task[%s] and resumed it", setEndOffsetTaskIds.get(i));
} else {
String taskId = setEndOffsetTaskIds.get(i);
killTask(
taskId,
"Task [%s] failed to respond to [set end offsets] in a timely manner, killing task",
taskId
);
killTask(taskId, "Failed to set end offsets, killing task");
taskGroup.tasks.remove(taskId);
}
}
}
catch (Exception e) {
log.error("Something bad happened [%s]", e.getMessage());
log.error("An exception occurred while setting end offsets: [%s]", e.getMessage());
throw new RuntimeException(e);
}

if (taskGroup.tasks.isEmpty()) {
log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId);
log.info("All tasks in taskGroup[%d] have failed, tasks will be re-created", taskGroup.groupId);
return null;
}

Expand Down
Loading

0 comments on commit 67c7b62

Please sign in to comment.