From 95b0de61d172a323aead25571b8b83b4ae0b7120 Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Fri, 25 Aug 2023 12:50:38 -0400 Subject: [PATCH] Move some lifecycle management from doTask -> shutdown for the mm-less task runner (#14895) * save work * Add syncronized * Don't shutdown in run * Adding unit tests * Cleanup lifecycle * Fix tests * remove newline --- .../k8s/overlord/KubernetesPeonLifecycle.java | 9 +-- .../k8s/overlord/KubernetesTaskRunner.java | 24 ++++--- .../k8s/overlord/KubernetesWorkItem.java | 9 --- .../overlord/KubernetesPeonLifecycleTest.java | 14 +---- .../overlord/KubernetesTaskRunnerTest.java | 62 +++++++++---------- .../k8s/overlord/KubernetesWorkItemTest.java | 2 - 6 files changed, 52 insertions(+), 68 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index f6b15f46bc58..4814d8cbb609 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -137,7 +137,6 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout) } catch (Exception e) { log.info("Failed to run task: %s", taskId.getOriginalTaskId()); - shutdown(); throw e; } finally { @@ -168,10 +167,9 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio finally { try { saveLogs(); - shutdown(); } catch (Exception e) { - log.warn(e, "Task [%s] cleanup failed", taskId); + log.warn(e, "Log processing failed for task [%s]", taskId); } stopTask(); @@ -188,7 +186,7 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio */ protected void shutdown() { - if (State.PENDING.equals(state.get()) || State.RUNNING.equals(state.get())) { + if (State.PENDING.equals(state.get()) || State.RUNNING.equals(state.get()) || State.STOPPED.equals(state.get())) { kubernetesClient.deletePeonJob(taskId); } } @@ -223,7 +221,7 @@ protected State getState() */ protected TaskLocation getTaskLocation() { - if (!State.RUNNING.equals(state.get())) { + if (State.PENDING.equals(state.get()) || State.NOT_STARTED.equals(state.get())) { log.debug("Can't get task location for non-running job. [%s]", taskId.getOriginalTaskId()); return TaskLocation.unknown(); } @@ -251,7 +249,6 @@ protected TaskLocation getTaskLocation() Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")), pod.getMetadata() != null ? pod.getMetadata().getName() : "" ); - log.info("K8s task %s is running at location %s", taskId.getOriginalTaskId(), taskLocation); } return taskLocation; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 9a4e4bcca629..24e21b0b4e0e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -182,10 +182,6 @@ protected TaskStatus doTask(Task task, boolean run) KubernetesWorkItem workItem = tasks.get(task.getId()); if (workItem == null) { - throw new ISE("Task [%s] disappeared", task.getId()); - } - - if (workItem.isShutdownRequested()) { throw new ISE("Task [%s] has been shut down", task.getId()); } @@ -213,11 +209,6 @@ protected TaskStatus doTask(Task task, boolean run) log.error(e, "Task [%s] execution caught an exception", task.getId()); throw new RuntimeException(e); } - finally { - synchronized (tasks) { - tasks.remove(task.getId()); - } - } } @VisibleForTesting @@ -271,6 +262,10 @@ public void shutdown(String taskid, String reason) return; } + synchronized (tasks) { + tasks.remove(taskid); + } + workItem.shutdown(); } @@ -440,6 +435,17 @@ public Collection getPendingTasks() .collect(Collectors.toList()); } + @Override + public TaskLocation getTaskLocation(String taskId) + { + final KubernetesWorkItem workItem = tasks.get(taskId); + if (workItem == null) { + return TaskLocation.unknown(); + } else { + return workItem.getLocation(); + } + } + @Nullable @Override public RunnerTaskState getRunnerTaskState(String taskId) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 164a82b6ae27..94d4bbb67f63 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -30,13 +30,10 @@ import org.apache.druid.java.util.common.ISE; import java.io.InputStream; -import java.util.concurrent.atomic.AtomicBoolean; public class KubernetesWorkItem extends TaskRunnerWorkItem { private final Task task; - - private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; public KubernetesWorkItem(Task task, ListenableFuture statusFuture) @@ -53,7 +50,6 @@ protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle k protected synchronized void shutdown() { - this.shutdownRequested.set(true); if (this.kubernetesPeonLifecycle != null) { this.kubernetesPeonLifecycle.startWatchingLogs(); @@ -61,11 +57,6 @@ protected synchronized void shutdown() } } - protected boolean isShutdownRequested() - { - return shutdownRequested.get(); - } - protected boolean isPending() { return RunnerTaskState.PENDING.equals(getRunnerTaskState()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 3a017e5f74ff..084d1db62d2b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -198,9 +198,6 @@ protected synchronized TaskStatus join(long timeout) EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) )).andReturn(null); - EasyMock.expect(kubernetesClient.deletePeonJob( - new K8sTaskId(ID) - )).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID); EasyMock.expectLastCall().once(); @@ -245,7 +242,6 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); replayAll(); @@ -298,7 +294,6 @@ public void test_join() throws IOException EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -353,7 +348,6 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -408,7 +402,6 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -459,7 +452,6 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -512,7 +504,6 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -554,8 +545,6 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); - Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -908,8 +897,11 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown() stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once(); + replayAll(); Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation()); + verifyAll(); } private void setPeonLifecycleState(KubernetesPeonLifecycle peonLifecycle, KubernetesPeonLifecycle.State state) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index ca1fc641719e..bee3a533c7b8 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -152,8 +152,6 @@ public void test_run_withoutExistingTask() throws IOException, ExecutionExceptio Assert.assertEquals(taskStatus, future.get()); verifyAll(); - - Assert.assertFalse(runner.tasks.containsKey(task.getId())); } @Test @@ -191,8 +189,6 @@ public void test_run_whenExceptionThrown_throwsRuntimeException() throws IOExcep Assert.assertTrue(e.getCause() instanceof RuntimeException); verifyAll(); - - Assert.assertFalse(runner.tasks.containsKey(task.getId())); } @Test @@ -208,8 +204,6 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt Assert.assertEquals(taskStatus, future.get()); verifyAll(); - - Assert.assertFalse(runner.tasks.containsKey(task.getId())); } @Test @@ -236,28 +230,11 @@ public void test_join_whenExceptionThrown_throwsRuntimeException() Assert.assertTrue(e.getCause() instanceof RuntimeException); verifyAll(); - - Assert.assertFalse(runner.tasks.containsKey(task.getId())); - } - - @Test - public void test_doTask_withoutWorkItem_throwsRuntimeException() - { - Assert.assertThrows( - "Task [id] disappeared", - RuntimeException.class, - () -> runner.doTask(task, true) - ); } @Test public void test_doTask_whenShutdownRequested_throwsRuntimeException() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task, null); - workItem.shutdown(); - - runner.tasks.put(task.getId(), workItem); - Assert.assertThrows( "Task [id] has been shut down", RuntimeException.class, @@ -266,13 +243,7 @@ public void test_doTask_whenShutdownRequested_throwsRuntimeException() } @Test - public void test_shutdown_withoutExistingTask() - { - runner.shutdown(task.getId(), ""); - } - - @Test - public void test_shutdown_withExistingTask() + public void test_shutdown_withExistingTask_removesTaskFromMap() { KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { @Override @@ -282,7 +253,13 @@ protected synchronized void shutdown() }; runner.tasks.put(task.getId(), workItem); + runner.shutdown(task.getId(), ""); + Assert.assertTrue(runner.tasks.isEmpty()); + } + @Test + public void test_shutdown_withoutExistingTask() + { runner.shutdown(task.getId(), ""); } @@ -629,6 +606,30 @@ public TaskLocation getLocation() verifyAll(); } + @Test + public void test_getTaskLocation_withExistingTask() + { + KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) { + @Override + public TaskLocation getLocation() + { + return TaskLocation.create("host", 0, 1, false); + } + }; + + runner.tasks.put(task.getId(), workItem); + + TaskLocation taskLocation = runner.getTaskLocation(task.getId()); + Assert.assertEquals(TaskLocation.create("host", 0, 1, false), taskLocation); + } + + @Test + public void test_getTaskLocation_noTaskFound() + { + TaskLocation taskLocation = runner.getTaskLocation(task.getId()); + Assert.assertEquals(TaskLocation.unknown(), taskLocation); + } + @Test public void test_getTotalCapacity() { @@ -644,6 +645,5 @@ public void test_getUsedCapacity() Assert.assertEquals(1, runner.getUsedCapacity()); runner.tasks.remove(task.getId()); Assert.assertEquals(0, runner.getUsedCapacity()); - } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 5f951770480f..d5cf2ea7252d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -75,7 +75,6 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() public void test_shutdown_withoutKubernetesPeonLifecycle() { workItem.shutdown(); - Assert.assertTrue(workItem.isShutdownRequested()); } @Test @@ -91,7 +90,6 @@ public void test_shutdown_withKubernetesPeonLifecycle() workItem.shutdown(); verifyAll(); - Assert.assertTrue(workItem.isShutdownRequested()); } @Test