From a8906b6ea0cef7f496f2a82c9c45308d940e7c79 Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Fri, 3 Nov 2023 21:28:46 -0400 Subject: [PATCH] Fix k8s task runner failure reporting (#15311) * Fix k8s task runner failure reporting * Fix reference * add jsonignore * PR changes --- .../common/actions/UpdateStatusAction.java | 30 +++++-- .../indexing/common/task/AbstractTask.java | 17 ++-- .../actions/UpdateStatusActionTest.java | 26 +++++- .../common/task/AbstractTaskTest.java | 79 ++++++++++++++++++- 4 files changed, 135 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java index 55199ac9d0d1..0f1fd445c6e2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java @@ -34,13 +34,25 @@ public class UpdateStatusAction implements TaskAction { @JsonIgnore private final String status; + @JsonIgnore + private final TaskStatus statusFull; + + @Deprecated + public UpdateStatusAction( + String status + ) + { + this(status, null); + } @JsonCreator public UpdateStatusAction( - @JsonProperty("status") String status + @JsonProperty("status") String status, + @JsonProperty("statusFull") TaskStatus statusFull ) { this.status = status; + this.statusFull = statusFull; } @@ -50,6 +62,12 @@ public String getStatus() return status; } + @JsonProperty + public TaskStatus getStatusFull() + { + return statusFull; + } + @Override public TypeReference getReturnTypeReference() { @@ -63,9 +81,8 @@ public Void perform(Task task, TaskActionToolbox toolbox) { Optional taskRunner = toolbox.getTaskRunner(); if (taskRunner.isPresent()) { - TaskStatus result = "successful".equals(status) - ? TaskStatus.success(task.getId()) - : TaskStatus.failure(task.getId(), "Error with task"); + // Fall back to checking status instead of statusFull for backwards compatibility + TaskStatus result = statusFull != null ? statusFull : "successful".equals(status) ? TaskStatus.success(task.getId()) : TaskStatus.failure(task.getId(), "Error with task"); taskRunner.get().updateStatus(task, result); } return null; @@ -82,6 +99,7 @@ public String toString() { return "UpdateStatusAction{" + "status=" + status + + ", statusFull=" + statusFull + '}'; } @@ -95,12 +113,12 @@ public boolean equals(Object o) return false; } UpdateStatusAction that = (UpdateStatusAction) o; - return Objects.equals(status, that.status); + return Objects.equals(status, that.status) && Objects.equals(statusFull, that.statusFull); } @Override public int hashCode() { - return Objects.hash(status); + return Objects.hash(status, statusFull); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index ea0cb566b2ce..cd17160f3aae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -159,12 +159,13 @@ public String setup(TaskToolbox toolbox) throws Exception @Override public final TaskStatus run(TaskToolbox taskToolbox) throws Exception { - TaskStatus taskStatus = TaskStatus.running(getId()); + TaskStatus taskStatus = null; try { cleanupCompletionLatch = new CountDownLatch(1); String errorMessage = setup(taskToolbox); if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) { - return TaskStatus.failure(getId(), errorMessage); + taskStatus = TaskStatus.failure(getId(), errorMessage); + return taskStatus; } taskStatus = runTask(taskToolbox); return taskStatus; @@ -186,7 +187,7 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception; @Override - public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception + public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws Exception { // clear any interrupted status to ensure subsequent cleanup proceeds without interruption. Thread.interrupted(); @@ -196,11 +197,11 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception return; } + TaskStatus taskStatusToReport = taskStatus == null + ? TaskStatus.failure(id, "Task failed to run") + : taskStatus; // report back to the overlord - UpdateStatusAction status = new UpdateStatusAction("successful"); - if (taskStatus.isFailure()) { - status = new UpdateStatusAction("failure"); - } + UpdateStatusAction status = new UpdateStatusAction("", taskStatusToReport); toolbox.getTaskActionClient().submit(status); if (reportsFile != null && reportsFile.exists()) { @@ -211,7 +212,7 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception } if (statusFile != null) { - toolbox.getJsonMapper().writeValue(statusFile, taskStatus); + toolbox.getJsonMapper().writeValue(statusFile, taskStatusToReport); toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile); Files.deleteIfExists(statusFile.toPath()); log.debug("Pushed task status"); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java index 84a40b536f3a..ab855944325c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateStatusActionTest.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.TaskRunner; +import org.junit.Assert; import org.junit.Test; import static org.mockito.ArgumentMatchers.any; @@ -38,6 +39,8 @@ public class UpdateStatusActionTest { + private static final String ID = "id"; + @Test public void testActionCallsTaskRunner() { @@ -62,10 +65,23 @@ public void testFailureScenario() verify(runner, times(1)).updateStatus(eq(task), eq(TaskStatus.failure(task.getId(), "Error with task"))); } + @Test + public void testTaskStatusFull() + { + Task task = NoopTask.create(); + TaskActionToolbox toolbox = mock(TaskActionToolbox.class); + TaskRunner runner = mock(TaskRunner.class); + when(toolbox.getTaskRunner()).thenReturn(Optional.of(runner)); + TaskStatus taskStatus = TaskStatus.failure(task.getId(), "custom error message"); + UpdateStatusAction action = new UpdateStatusAction("failure", taskStatus); + action.perform(task, toolbox); + verify(runner, times(1)).updateStatus(eq(task), eq(taskStatus)); + } + @Test public void testNoTaskRunner() { - UpdateStatusAction action = new UpdateStatusAction("successful"); + UpdateStatusAction action = new UpdateStatusAction("", TaskStatus.success(ID)); Task task = NoopTask.create(); TaskActionToolbox toolbox = mock(TaskActionToolbox.class); TaskRunner runner = mock(TaskRunner.class); @@ -73,4 +89,12 @@ public void testNoTaskRunner() action.perform(task, toolbox); verify(runner, never()).updateStatus(any(), any()); } + + @Test + public void testEquals() + { + UpdateStatusAction one = new UpdateStatusAction("", TaskStatus.failure(ID, "error")); + UpdateStatusAction two = new UpdateStatusAction("", TaskStatus.failure(ID, "error")); + Assert.assertEquals(one, two); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index 39b0bdfcfc50..bcd2f086fd06 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -182,19 +182,94 @@ public void testTaskFailureWithoutExceptionGetsReportedCorrectly() throws Except when(taskActionClient.submit(any())).thenReturn(TaskConfig.class); when(toolbox.getTaskActionClient()).thenReturn(taskActionClient); + TaskStatus taskStatus = TaskStatus.failure("myId", "failed"); AbstractTask task = new NoopTask("myID", null, null, 1, 0, null) { @Override public TaskStatus runTask(TaskToolbox toolbox) { - return TaskStatus.failure("myId", "failed"); + return taskStatus; } }; task.run(toolbox); - UpdateStatusAction action = new UpdateStatusAction("failure"); + UpdateStatusAction action = new UpdateStatusAction("", taskStatus); verify(taskActionClient).submit(eq(action)); } + @Test + public void testNullStackStatusGetsReportedCorrectly() throws Exception + { + TaskToolbox toolbox = mock(TaskToolbox.class); + when(toolbox.getAttemptId()).thenReturn("1"); + + DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true); + when(toolbox.getTaskExecutorNode()).thenReturn(node); + + TaskLogPusher pusher = mock(TaskLogPusher.class); + when(toolbox.getTaskLogPusher()).thenReturn(pusher); + + TaskConfig config = mock(TaskConfig.class); + when(config.isEncapsulatedTask()).thenReturn(true); + File folder = temporaryFolder.newFolder(); + when(config.getTaskDir(eq("myID"))).thenReturn(folder); + when(toolbox.getConfig()).thenReturn(config); + when(toolbox.getJsonMapper()).thenReturn(objectMapper); + + TaskActionClient taskActionClient = mock(TaskActionClient.class); + when(taskActionClient.submit(any())).thenReturn(TaskConfig.class); + when(toolbox.getTaskActionClient()).thenReturn(taskActionClient); + AbstractTask task = new NoopTask("myID", null, null, 1, 0, null) + { + @Nullable + @Override + public TaskStatus runTask(TaskToolbox toolbox) + { + // Simulate the scenario where taskStatus is never set and cleanUp is called with null. + return null; + } + }; + task.run(toolbox); + UpdateStatusAction action = new UpdateStatusAction("", TaskStatus.failure(task.getId(), "Task failed to run")); + verify(taskActionClient).submit(eq(action)); + } + + @Test + public void testSetupFailsGetsReportedCorrectly() throws Exception + { + TaskToolbox toolbox = mock(TaskToolbox.class); + when(toolbox.getAttemptId()).thenReturn("1"); + + DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true); + when(toolbox.getTaskExecutorNode()).thenReturn(node); + + TaskLogPusher pusher = mock(TaskLogPusher.class); + when(toolbox.getTaskLogPusher()).thenReturn(pusher); + + TaskConfig config = mock(TaskConfig.class); + when(config.isEncapsulatedTask()).thenReturn(true); + File folder = temporaryFolder.newFolder(); + when(config.getTaskDir(eq("myID"))).thenReturn(folder); + when(toolbox.getConfig()).thenReturn(config); + when(toolbox.getJsonMapper()).thenReturn(objectMapper); + + TaskActionClient taskActionClient = mock(TaskActionClient.class); + when(taskActionClient.submit(any())).thenReturn(TaskConfig.class); + when(toolbox.getTaskActionClient()).thenReturn(taskActionClient); + AbstractTask task = new NoopTask("myID", null, null, 1, 0, null) + { + @Nullable + @Override + public String setup(TaskToolbox toolbox) + { + return "setup error"; + } + }; + task.run(toolbox); + UpdateStatusAction action = new UpdateStatusAction("", TaskStatus.failure(task.getId(), "setup error")); + verify(taskActionClient).submit(eq(action)); + } + + @Test public void testBatchIOConfigAppend() {