Skip to content

Commit

Permalink
Fix k8s task runner failure reporting (#15311)
Browse files Browse the repository at this point in the history
* Fix k8s task runner failure reporting

* Fix reference

* add jsonignore

* PR changes
  • Loading branch information
georgew5656 authored Nov 4, 2023
1 parent 5d39b94 commit a8906b6
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,25 @@ public class UpdateStatusAction implements TaskAction<Void>
{
@JsonIgnore
private final String status;
@JsonIgnore
private final TaskStatus statusFull;

@Deprecated
public UpdateStatusAction(
String status
)
{
this(status, null);
}

@JsonCreator
public UpdateStatusAction(
@JsonProperty("status") String status
@JsonProperty("status") String status,
@JsonProperty("statusFull") TaskStatus statusFull
)
{
this.status = status;
this.statusFull = statusFull;
}


Expand All @@ -50,6 +62,12 @@ public String getStatus()
return status;
}

@JsonProperty
public TaskStatus getStatusFull()
{
return statusFull;
}

@Override
public TypeReference<Void> getReturnTypeReference()
{
Expand All @@ -63,9 +81,8 @@ public Void perform(Task task, TaskActionToolbox toolbox)
{
Optional<TaskRunner> taskRunner = toolbox.getTaskRunner();
if (taskRunner.isPresent()) {
TaskStatus result = "successful".equals(status)
? TaskStatus.success(task.getId())
: TaskStatus.failure(task.getId(), "Error with task");
// Fall back to checking status instead of statusFull for backwards compatibility
TaskStatus result = statusFull != null ? statusFull : "successful".equals(status) ? TaskStatus.success(task.getId()) : TaskStatus.failure(task.getId(), "Error with task");
taskRunner.get().updateStatus(task, result);
}
return null;
Expand All @@ -82,6 +99,7 @@ public String toString()
{
return "UpdateStatusAction{" +
"status=" + status +
", statusFull=" + statusFull +
'}';
}

Expand All @@ -95,12 +113,12 @@ public boolean equals(Object o)
return false;
}
UpdateStatusAction that = (UpdateStatusAction) o;
return Objects.equals(status, that.status);
return Objects.equals(status, that.status) && Objects.equals(statusFull, that.statusFull);
}

@Override
public int hashCode()
{
return Objects.hash(status);
return Objects.hash(status, statusFull);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,13 @@ public String setup(TaskToolbox toolbox) throws Exception
@Override
public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
{
TaskStatus taskStatus = TaskStatus.running(getId());
TaskStatus taskStatus = null;
try {
cleanupCompletionLatch = new CountDownLatch(1);
String errorMessage = setup(taskToolbox);
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
return TaskStatus.failure(getId(), errorMessage);
taskStatus = TaskStatus.failure(getId(), errorMessage);
return taskStatus;
}
taskStatus = runTask(taskToolbox);
return taskStatus;
Expand All @@ -186,7 +187,7 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception
public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception;

@Override
public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception
{
// clear any interrupted status to ensure subsequent cleanup proceeds without interruption.
Thread.interrupted();
Expand All @@ -196,11 +197,11 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
return;
}

TaskStatus taskStatusToReport = taskStatus == null
? TaskStatus.failure(id, "Task failed to run")
: taskStatus;
// report back to the overlord
UpdateStatusAction status = new UpdateStatusAction("successful");
if (taskStatus.isFailure()) {
status = new UpdateStatusAction("failure");
}
UpdateStatusAction status = new UpdateStatusAction("", taskStatusToReport);
toolbox.getTaskActionClient().submit(status);

if (reportsFile != null && reportsFile.exists()) {
Expand All @@ -211,7 +212,7 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception
}

if (statusFile != null) {
toolbox.getJsonMapper().writeValue(statusFile, taskStatus);
toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport);
toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile);
Files.deleteIfExists(statusFile.toPath());
log.debug("Pushed task status");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.junit.Assert;
import org.junit.Test;

import static org.mockito.ArgumentMatchers.any;
Expand All @@ -38,6 +39,8 @@
public class UpdateStatusActionTest
{

private static final String ID = "id";

@Test
public void testActionCallsTaskRunner()
{
Expand All @@ -62,15 +65,36 @@ public void testFailureScenario()
verify(runner, times(1)).updateStatus(eq(task), eq(TaskStatus.failure(task.getId(), "Error with task")));
}

@Test
public void testTaskStatusFull()
{
Task task = NoopTask.create();
TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
TaskRunner runner = mock(TaskRunner.class);
when(toolbox.getTaskRunner()).thenReturn(Optional.of(runner));
TaskStatus taskStatus = TaskStatus.failure(task.getId(), "custom error message");
UpdateStatusAction action = new UpdateStatusAction("failure", taskStatus);
action.perform(task, toolbox);
verify(runner, times(1)).updateStatus(eq(task), eq(taskStatus));
}

@Test
public void testNoTaskRunner()
{
UpdateStatusAction action = new UpdateStatusAction("successful");
UpdateStatusAction action = new UpdateStatusAction("", TaskStatus.success(ID));
Task task = NoopTask.create();
TaskActionToolbox toolbox = mock(TaskActionToolbox.class);
TaskRunner runner = mock(TaskRunner.class);
when(toolbox.getTaskRunner()).thenReturn(Optional.absent());
action.perform(task, toolbox);
verify(runner, never()).updateStatus(any(), any());
}

@Test
public void testEquals()
{
UpdateStatusAction one = new UpdateStatusAction("", TaskStatus.failure(ID, "error"));
UpdateStatusAction two = new UpdateStatusAction("", TaskStatus.failure(ID, "error"));
Assert.assertEquals(one, two);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,19 +182,94 @@ public void testTaskFailureWithoutExceptionGetsReportedCorrectly() throws Except
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);

TaskStatus taskStatus = TaskStatus.failure("myId", "failed");
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
{
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
return TaskStatus.failure("myId", "failed");
return taskStatus;
}
};
task.run(toolbox);
UpdateStatusAction action = new UpdateStatusAction("failure");
UpdateStatusAction action = new UpdateStatusAction("", taskStatus);
verify(taskActionClient).submit(eq(action));
}

@Test
public void testNullStackStatusGetsReportedCorrectly() throws Exception
{
TaskToolbox toolbox = mock(TaskToolbox.class);
when(toolbox.getAttemptId()).thenReturn("1");

DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
when(toolbox.getTaskExecutorNode()).thenReturn(node);

TaskLogPusher pusher = mock(TaskLogPusher.class);
when(toolbox.getTaskLogPusher()).thenReturn(pusher);

TaskConfig config = mock(TaskConfig.class);
when(config.isEncapsulatedTask()).thenReturn(true);
File folder = temporaryFolder.newFolder();
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
when(toolbox.getJsonMapper()).thenReturn(objectMapper);

TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
{
@Nullable
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
// Simulate the scenario where taskStatus is never set and cleanUp is called with null.
return null;
}
};
task.run(toolbox);
UpdateStatusAction action = new UpdateStatusAction("", TaskStatus.failure(task.getId(), "Task failed to run"));
verify(taskActionClient).submit(eq(action));
}

@Test
public void testSetupFailsGetsReportedCorrectly() throws Exception
{
TaskToolbox toolbox = mock(TaskToolbox.class);
when(toolbox.getAttemptId()).thenReturn("1");

DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
when(toolbox.getTaskExecutorNode()).thenReturn(node);

TaskLogPusher pusher = mock(TaskLogPusher.class);
when(toolbox.getTaskLogPusher()).thenReturn(pusher);

TaskConfig config = mock(TaskConfig.class);
when(config.isEncapsulatedTask()).thenReturn(true);
File folder = temporaryFolder.newFolder();
when(config.getTaskDir(eq("myID"))).thenReturn(folder);
when(toolbox.getConfig()).thenReturn(config);
when(toolbox.getJsonMapper()).thenReturn(objectMapper);

TaskActionClient taskActionClient = mock(TaskActionClient.class);
when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
AbstractTask task = new NoopTask("myID", null, null, 1, 0, null)
{
@Nullable
@Override
public String setup(TaskToolbox toolbox)
{
return "setup error";
}
};
task.run(toolbox);
UpdateStatusAction action = new UpdateStatusAction("", TaskStatus.failure(task.getId(), "setup error"));
verify(taskActionClient).submit(eq(action));
}


@Test
public void testBatchIOConfigAppend()
{
Expand Down

0 comments on commit a8906b6

Please sign in to comment.