From a8c4511a56d25dfd3ffec3c6e804e4c0f8d0b323 Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Tue, 17 Oct 2023 08:17:43 -0700 Subject: [PATCH] Separate task lifecycle from kubernetes/location lifecycle (#15133) * Separate k8s and druid task lifecycles * Remove extra log lines * Fix unit tests * fix unit tests * Fix unit tests * notify listeners on task completion * Fix unit test * unused var * PR changes * Fix unit tests * Fix checkstyle * PR changes --- .../KubernetesAndWorkerTaskRunner.java | 6 - .../k8s/overlord/KubernetesPeonLifecycle.java | 25 +++- .../KubernetesPeonLifecycleFactory.java | 10 +- .../k8s/overlord/KubernetesTaskRunner.java | 47 ++++-- .../overlord/KubernetesTaskRunnerConfig.java | 2 +- .../k8s/overlord/KubernetesWorkItem.java | 23 ++- .../k8s/overlord/PeonLifecycleFactory.java | 7 +- .../k8s/overlord/common/JobResponse.java | 9 +- .../overlord/common/KubernetesPeonClient.java | 6 +- .../druid/k8s/overlord/common/PeonPhase.java | 62 -------- .../KubernetesAndWorkerTaskRunnerTest.java | 9 -- .../overlord/KubernetesPeonLifecycleTest.java | 140 +++++++++++++----- .../KubernetesTaskRunnerConfigTest.java | 4 +- .../overlord/KubernetesTaskRunnerTest.java | 102 +++++++++++-- .../k8s/overlord/KubernetesWorkItemTest.java | 11 +- .../overlord/TestPeonLifecycleFactory.java | 7 +- .../k8s/overlord/common/JobResponseTest.java | 8 +- .../common/KubernetesPeonClientTest.java | 3 - .../k8s/overlord/common/PeonPhaseTest.java | 43 ------ .../DruidPeonClientIntegrationTest.java | 5 +- .../resources/expectedEphemeralOutput.yaml | 2 +- .../expectedMultiContainerOutput.yaml | 2 +- .../expectedMultiContainerOutputOrder.yaml | 2 +- .../src/test/resources/expectedNoopJob.yaml | 2 +- .../resources/expectedNoopJobLongIds.yaml | 2 +- .../resources/expectedNoopJobNoTaskJson.yaml | 2 +- .../resources/expectedNoopJobTlsEnabled.yaml | 2 +- .../src/test/resources/expectedPodSpec.yaml | 2 +- .../expectedSingleContainerOutput.yaml | 2 +- .../common/actions/UpdateLocationAction.java | 12 +- .../indexing/common/task/AbstractTask.java | 10 -- .../druid/indexing/overlord/TaskRunner.java | 5 - .../actions/UpdateLocationActionTest.java | 71 --------- .../common/task/AbstractTaskTest.java | 2 +- 34 files changed, 324 insertions(+), 323 deletions(-) delete mode 100644 extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java delete mode 100644 extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java delete mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java index 243f6626c664c..8c41772aea692 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java @@ -283,10 +283,4 @@ public void updateStatus(Task task, TaskStatus status) { kubernetesTaskRunner.updateStatus(task, status); } - - @Override - public void updateLocation(Task task, TaskLocation location) - { - kubernetesTaskRunner.updateLocation(task, location); - } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 5c6c7c6b3ebe7..ea3c2a19d1c2a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -31,6 +31,9 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.indexing.overlord.TaskRunnerUtils; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; @@ -47,6 +50,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -89,6 +94,8 @@ protected enum State private final KubernetesPeonClient kubernetesClient; private final ObjectMapper mapper; private final TaskStateListener stateListener; + private final List> listeners; + @MonotonicNonNull private LogWatch logWatch; @@ -99,7 +106,8 @@ protected KubernetesPeonLifecycle( KubernetesPeonClient kubernetesClient, TaskLogs taskLogs, ObjectMapper mapper, - TaskStateListener stateListener + TaskStateListener stateListener, + List> listeners ) { this.taskId = new K8sTaskId(task); @@ -108,6 +116,7 @@ protected KubernetesPeonLifecycle( this.taskLogs = taskLogs; this.mapper = mapper; this.stateListener = stateListener; + this.listeners = listeners; } /** @@ -178,7 +187,11 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio { try { updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); - + TaskRunnerUtils.notifyLocationChanged( + listeners, + task.getId(), + getTaskLocation() + ); JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( taskId, timeout, @@ -190,12 +203,14 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio finally { try { saveLogs(); + shutdown(); } catch (Exception e) { - log.warn(e, "Log processing failed for task [%s]", taskId); + log.warn(e, "Cleanup failed for task [%s]", taskId); + } + finally { + stopTask(); } - - stopTask(); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java index bf4e3a712577a..2998f3fc9212a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java @@ -21,9 +21,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.tasklogs.TaskLogs; +import java.util.List; +import java.util.concurrent.Executor; + public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory { private final KubernetesPeonClient client; @@ -42,14 +47,15 @@ public KubernetesPeonLifecycleFactory( } @Override - public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener) + public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List> listeners) { return new KubernetesPeonLifecycle( task, client, taskLogs, mapper, - stateListener + stateListener, + listeners ); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index a0a29dcbbb926..56de37c379806 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -146,16 +146,20 @@ public Optional streamTaskLog(String taskid, long offset) public ListenableFuture run(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> { + ListenableFuture unused = exec.submit(() -> runTask(task)); + return new KubernetesWorkItem(task); + }).getResult(); } } protected ListenableFuture joinAsync(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> { + ListenableFuture unused = exec.submit(() -> joinTask(task)); + return new KubernetesWorkItem(task); + }).getResult(); } } @@ -172,10 +176,12 @@ private TaskStatus joinTask(Task task) @VisibleForTesting protected TaskStatus doTask(Task task, boolean run) { + TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution never started"); try { KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build( task, - this::emitTaskStateMetrics + this::emitTaskStateMetrics, + listeners ); synchronized (tasks) { @@ -188,7 +194,6 @@ protected TaskStatus doTask(Task task, boolean run) workItem.setKubernetesPeonLifecycle(peonLifecycle); } - TaskStatus taskStatus; if (run) { taskStatus = peonLifecycle.run( adapter.fromTask(task), @@ -201,15 +206,17 @@ protected TaskStatus doTask(Task task, boolean run) config.getTaskTimeout().toStandardDuration().getMillis() ); } - - updateStatus(task, taskStatus); - return taskStatus; } catch (Exception e) { log.error(e, "Task [%s] execution caught an exception", task.getId()); + taskStatus = TaskStatus.failure(task.getId(), "Could not start task execution"); throw new RuntimeException(e); } + finally { + updateStatus(task, taskStatus); + TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), TaskLocation.unknown()); + } } @VisibleForTesting @@ -242,13 +249,13 @@ protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String @Override public void updateStatus(Task task, TaskStatus status) { - TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); - } + KubernetesWorkItem workItem = tasks.get(task.getId()); + if (workItem != null && !workItem.getResult().isDone() && status.isComplete()) { + workItem.setResult(status); + } - @Override - public void updateLocation(Task task, TaskLocation location) - { - TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location); + // Notify listeners even if the result is set to handle the shutdown case. + TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); } @Override @@ -417,6 +424,16 @@ public void registerListener(TaskRunnerListener listener, Executor executor) final Pair listenerPair = Pair.of(listener, executor); log.debug("Registered listener [%s]", listener.getListenerId()); listeners.add(listenerPair); + + for (Map.Entry entry : tasks.entrySet()) { + if (entry.getValue().isRunning()) { + TaskRunnerUtils.notifyLocationChanged( + ImmutableList.of(listenerPair), + entry.getKey(), + entry.getValue().getLocation() + ); + } + } } @Override diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 0d67c55b30aa8..5fdcd9cfbb717 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -78,7 +78,7 @@ public class KubernetesTaskRunnerConfig @JsonProperty @NotNull // how long to wait for the jobs to be cleaned up. - private Period taskCleanupDelay = new Period("P2D"); + private Period taskCleanupDelay = new Period("PT1H"); @JsonProperty @NotNull diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 94d4bbb67f63a..b089b4dd2db0b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -19,9 +19,10 @@ package org.apache.druid.k8s.overlord; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -36,9 +37,18 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem private final Task task; private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; - public KubernetesWorkItem(Task task, ListenableFuture statusFuture) + private final SettableFuture result; + + public KubernetesWorkItem(Task task) + { + this(task, SettableFuture.create()); + } + + @VisibleForTesting + public KubernetesWorkItem(Task task, SettableFuture result) { - super(task.getId(), statusFuture); + super(task.getId(), result); + this.result = result; this.task = task; } @@ -51,7 +61,7 @@ protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle k protected synchronized void shutdown() { - if (this.kubernetesPeonLifecycle != null) { + if (this.kubernetesPeonLifecycle != null && !result.isDone()) { this.kubernetesPeonLifecycle.startWatchingLogs(); this.kubernetesPeonLifecycle.shutdown(); } @@ -119,4 +129,9 @@ public Task getTask() { return task; } + + public void setResult(TaskStatus status) + { + result.set(status); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java index 2a234ebc5786b..2b180fb9dac0a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java @@ -20,8 +20,13 @@ package org.apache.druid.k8s.overlord; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.java.util.common.Pair; + +import java.util.List; +import java.util.concurrent.Executor; public interface PeonLifecycleFactory { - KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener); + KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List> listeners); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java index a7a8156468f69..26fe5b98f0bd9 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java @@ -32,12 +32,10 @@ public class JobResponse private static final EmittingLogger LOGGER = new EmittingLogger(JobResponse.class); private final Job job; - private final PeonPhase phase; - public JobResponse(@Nullable Job job, PeonPhase phase) + public JobResponse(@Nullable Job job) { this.job = job; - this.phase = phase; } public Job getJob() @@ -45,11 +43,6 @@ public Job getJob() return job; } - public PeonPhase getPhase() - { - return phase; - } - public long getJobDuration() { long duration = -1L; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java index 9fdc25fa64557..147ea732d0ca2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java @@ -101,13 +101,13 @@ public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, Time ); if (job == null) { log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId); - return new JobResponse(null, PeonPhase.FAILED); + return new JobResponse(null); } if (job.getStatus().getSucceeded() != null) { - return new JobResponse(job, PeonPhase.SUCCEEDED); + return new JobResponse(job); } log.warn("Task %s failed with status %s", taskId, job.getStatus()); - return new JobResponse(job, PeonPhase.FAILED); + return new JobResponse(job); }); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java deleted file mode 100644 index 6efcd34872b8a..0000000000000 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.k8s.overlord.common; - -import io.fabric8.kubernetes.api.model.Pod; - -import java.util.Arrays; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - -public enum PeonPhase -{ - PENDING("Pending"), - SUCCEEDED("Succeeded"), - FAILED("Failed"), - UNKNOWN("Unknown"), - RUNNING("Running"); - - private static final Map PHASE_MAP = Arrays.stream(PeonPhase.values()) - .collect(Collectors.toMap( - PeonPhase::getPhase, - Function.identity() - )); - private final String phase; - - PeonPhase(String phase) - { - this.phase = phase; - } - - public String getPhase() - { - return phase; - } - - public static PeonPhase getPhaseFor(Pod pod) - { - if (pod == null) { - return UNKNOWN; - } - return PHASE_MAP.get(pod.getStatus().getPhase()); - } - -} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java index af5a6c39bb0b3..80010b9e5396d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java @@ -332,13 +332,4 @@ public void test_updateStatus() runner.updateStatus(task, TaskStatus.running(ID)); verifyAll(); } - - @Test - public void test_updateLocation() - { - kubernetesTaskRunner.updateLocation(task, TaskLocation.unknown()); - replayAll(); - runner.updateLocation(task, TaskLocation.unknown()); - verifyAll(); - } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 1c6e429a3dc3b..e984e449b2822 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -31,11 +32,12 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.k8s.overlord.common.JobResponse; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; -import org.apache.druid.k8s.overlord.common.PeonPhase; import org.apache.druid.tasklogs.TaskLogs; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -50,6 +52,8 @@ import java.io.IOException; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -65,6 +69,8 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport @Mock LogWatch logWatch; @Mock KubernetesPeonLifecycle.TaskStateListener stateListener; + List> listeners = ImmutableList.of(); + private ObjectMapper mapper; private Task task; private K8sTaskId k8sTaskId; @@ -86,7 +92,8 @@ public void test_run() throws IOException kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ) { @Override @@ -131,7 +138,8 @@ public void test_run_useTaskManager() throws IOException kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ) { @Override @@ -175,7 +183,8 @@ public void test_run_whenCalledMultipleTimes_raisesIllegalStateException() throw kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ) { @Override @@ -224,7 +233,8 @@ public void test_run_whenExceptionRaised_setsRunnerTaskStateToNone() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ) { @Override @@ -268,15 +278,19 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); EasyMock.expect(kubernetesClient.waitForPeonJobCompletion( EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(null, PeonPhase.FAILED)); + )).andReturn(new JobResponse(null)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()); + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); + EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -302,12 +316,15 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException @Test public void test_join() throws IOException { + Executor executor = EasyMock.mock(Executor.class); + TaskRunnerListener taskRunnerListener = EasyMock.mock(TaskRunnerListener.class); KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + ImmutableList.of(Pair.of(taskRunnerListener, executor)) ); Job job = new JobBuilder() @@ -325,8 +342,12 @@ public void test_join() throws IOException EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + )).andReturn(new JobResponse(job)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of( IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8) )); @@ -338,6 +359,10 @@ public void test_join() throws IOException EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); + executor.execute(EasyMock.anyObject()); + EasyMock.expectLastCall(); + taskRunnerListener.locationChanged(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -359,7 +384,8 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); Job job = new JobBuilder() @@ -375,8 +401,15 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + )).andReturn(new JobResponse(job)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + + // Only update the location the first time, second call doesn't reach this point in the logic + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); + // Always try to delete the job + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true).times(2); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -419,7 +452,8 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); Job job = new JobBuilder() @@ -435,8 +469,12 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + )).andReturn(new JobResponse(job)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -469,7 +507,8 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); Job job = new JobBuilder() @@ -485,8 +524,12 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + )).andReturn(new JobResponse(job)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -519,7 +562,8 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); Job job = new JobBuilder() @@ -535,8 +579,11 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + )).andReturn(new JobResponse(job)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -549,6 +596,9 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th logWatch.close(); EasyMock.expectLastCall(); + // We should still try to cleanup the Job after + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -569,7 +619,8 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); EasyMock.expect(kubernetesClient.waitForPeonJobCompletion( @@ -578,6 +629,9 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr EasyMock.eq(TimeUnit.MILLISECONDS) )).andThrow(new RuntimeException()); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( + Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) + ); // We should still try to push logs EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); @@ -588,7 +642,7 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - + EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -608,7 +662,8 @@ public void test_shutdown_withNotStartedTaskState() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); peonLifecycle.shutdown(); } @@ -621,7 +676,8 @@ public void test_shutdown_withPendingTaskState() throws NoSuchFieldException, Il kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING); @@ -642,7 +698,8 @@ public void test_shutdown_withRunningTaskState() throws NoSuchFieldException, Il kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -663,7 +720,8 @@ public void test_shutdown_withStoppedTaskState() throws NoSuchFieldException, Il kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); @@ -678,7 +736,8 @@ public void test_streamLogs_withNotStartedTaskState() throws NoSuchFieldExceptio kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED); @@ -693,7 +752,8 @@ public void test_streamLogs_withPendingTaskState() throws NoSuchFieldException, kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING); @@ -708,7 +768,8 @@ public void test_streamLogs_withRunningTaskState() throws NoSuchFieldException, kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -731,7 +792,8 @@ public void test_streamLogs_withStoppedTaskState() throws NoSuchFieldException, kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); @@ -747,7 +809,8 @@ public void test_getTaskLocation_withNotStartedTaskState_returnsUnknown() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED); @@ -763,7 +826,8 @@ public void test_getTaskLocation_withPendingTaskState_returnsUnknown() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING); @@ -779,7 +843,8 @@ public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnkn kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -801,7 +866,8 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_r kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -829,7 +895,8 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_retu kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -865,7 +932,8 @@ public void test_getTaskLocation_saveTaskLocation() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -901,7 +969,8 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithT kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -938,7 +1007,8 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown() kubernetesClient, taskLogs, mapper, - stateListener + stateListener, + listeners ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java index 1f4a7281f649d..579b7539d818e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java @@ -47,7 +47,7 @@ public void test_deserializable() throws IOException Assert.assertNull(config.getGraceTerminationPeriodSeconds()); Assert.assertTrue(config.isDisableClientProxy()); Assert.assertEquals(new Period("PT4H"), config.getTaskTimeout()); - Assert.assertEquals(new Period("P2D"), config.getTaskCleanupDelay()); + Assert.assertEquals(new Period("PT1H"), config.getTaskCleanupDelay()); Assert.assertEquals(new Period("PT10m"), config.getTaskCleanupInterval()); Assert.assertEquals(new Period("PT1H"), config.getTaskLaunchTimeout()); Assert.assertEquals(ImmutableList.of(), config.getPeonMonitors()); @@ -72,7 +72,7 @@ public void test_builder_preservesDefaults() Assert.assertNull(config.getGraceTerminationPeriodSeconds()); Assert.assertTrue(config.isDisableClientProxy()); Assert.assertEquals(new Period("PT4H"), config.getTaskTimeout()); - Assert.assertEquals(new Period("P2D"), config.getTaskCleanupDelay()); + Assert.assertEquals(new Period("PT1H"), config.getTaskCleanupDelay()); Assert.assertEquals(new Period("PT10m"), config.getTaskCleanupInterval()); Assert.assertEquals(new Period("PT1H"), config.getTaskLaunchTimeout()); Assert.assertEquals(ImmutableList.of(), config.getPeonMonitors()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 36a7b4cfcd9cd..e04ef6300362b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -28,8 +28,10 @@ import org.apache.commons.io.IOUtils; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; @@ -76,6 +78,9 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle; @Mock private ServiceEmitter emitter; + @Mock private Executor executor; + @Mock private TaskRunnerListener taskRunnerListener; + private KubernetesTaskRunnerConfig config; private KubernetesTaskRunner runner; private Task task; @@ -116,11 +121,7 @@ protected ListenableFuture joinAsync(Task task) { return tasks.computeIfAbsent( task.getId(), - k -> new KubernetesWorkItem( - task, - Futures.immediateFuture(TaskStatus.success(task.getId())) - ) - ).getResult(); + k -> new KubernetesWorkItem(task)).getResult(); } }; @@ -249,7 +250,7 @@ public void test_run_withExistingTask_returnsExistingWorkItem() } @Test - public void test_run_whenExceptionThrown_throwsRuntimeException() throws IOException + public void test_run_whenExceptionThrown_throwsRuntimeException() throws Exception { Job job = new JobBuilder() .withNewMetadata() @@ -269,11 +270,89 @@ public void test_run_whenExceptionThrown_throwsRuntimeException() throws IOExcep replayAll(); ListenableFuture future = runner.run(task); + TaskStatus taskStatus = future.get(); + Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); + Assert.assertEquals("Could not start task execution", taskStatus.getErrorMsg()); + verifyAll(); + } - Exception e = Assert.assertThrows(ExecutionException.class, future::get); - Assert.assertTrue(e.getCause() instanceof RuntimeException); + @Test + public void test_run_updateStatus() throws ExecutionException, InterruptedException + { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ); + KubernetesWorkItem workItem = new KubernetesWorkItem(task); + runner.tasks.put(task.getId(), workItem); + TaskStatus completeTaskStatus = TaskStatus.success(task.getId()); + + replayAll(); + runner.updateStatus(task, completeTaskStatus); verifyAll(); + + assertTrue(workItem.getResult().isDone()); + assertEquals(completeTaskStatus, workItem.getResult().get()); + } + + @Test + public void test_run_updateStatus_running() + { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ); + KubernetesWorkItem workItem = new KubernetesWorkItem(task); + runner.tasks.put(task.getId(), workItem); + TaskStatus runningTaskStatus = TaskStatus.running(task.getId()); + + replayAll(); + runner.updateStatus(task, runningTaskStatus); + verifyAll(); + + assertFalse(workItem.getResult().isDone()); + } + + @Test + public void test_registerListener_runningTask() + { + KubernetesTaskRunner runner = new KubernetesTaskRunner( + taskAdapter, + config, + peonClient, + httpClient, + new TestPeonLifecycleFactory(kubernetesPeonLifecycle), + emitter + ); + + KubernetesPeonLifecycle runningKubernetesPeonLifecycle = EasyMock.mock(KubernetesPeonLifecycle.class); + EasyMock.expect(runningKubernetesPeonLifecycle.getState()).andReturn(KubernetesPeonLifecycle.State.RUNNING); + EasyMock.expect(runningKubernetesPeonLifecycle.getTaskLocation()).andReturn(TaskLocation.unknown()); + KubernetesWorkItem workItem = new KubernetesWorkItem(task); + workItem.setKubernetesPeonLifecycle(runningKubernetesPeonLifecycle); + runner.tasks.put(task.getId(), workItem); + + Executor executor = EasyMock.mock(Executor.class); + TaskRunnerListener taskRunnerListener = EasyMock.mock(TaskRunnerListener.class); + executor.execute(EasyMock.anyObject()); + EasyMock.expectLastCall(); + taskRunnerListener.locationChanged(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall(); + + replayAll(); + EasyMock.replay(runningKubernetesPeonLifecycle); + runner.registerListener(taskRunnerListener, executor); + verifyAll(); + EasyMock.verify(runningKubernetesPeonLifecycle); } @Test @@ -303,16 +382,15 @@ public void test_join_withExistingTask_returnsExistingWorkItem() } @Test - public void test_join_whenExceptionThrown_throwsRuntimeException() + public void test_join_whenExceptionThrown_throwsRuntimeException() throws ExecutionException, InterruptedException { EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andThrow(new IllegalStateException()); replayAll(); ListenableFuture future = runner.joinAsync(task); - - Exception e = Assert.assertThrows(ExecutionException.class, future::get); - Assert.assertTrue(e.getCause() instanceof RuntimeException); + TaskStatus taskStatus = future.get(); + Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); verifyAll(); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 7d17193b1714c..f2f398658e05c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -56,6 +56,7 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() null, null, null, + null, null )); @@ -66,6 +67,7 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() null, null, null, + null, null )) ); @@ -80,6 +82,8 @@ public void test_shutdown_withoutKubernetesPeonLifecycle() @Test public void test_shutdown_withKubernetesPeonLifecycle() { + KubernetesWorkItem workItem = new KubernetesWorkItem(task); + kubernetesPeonLifecycle.shutdown(); EasyMock.expectLastCall(); kubernetesPeonLifecycle.startWatchingLogs(); @@ -87,7 +91,6 @@ public void test_shutdown_withKubernetesPeonLifecycle() replayAll(); workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); - workItem.shutdown(); verifyAll(); } @@ -158,6 +161,7 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_returnsPending() null, null, null, + null, null )); @@ -172,6 +176,7 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_inPendingState_r null, null, null, + null, null ) { @Override @@ -194,6 +199,7 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_inRunningState_r null, null, null, + null, null ) { @Override @@ -216,6 +222,7 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_inStoppedState_r null, null, null, + null, null ) { @Override @@ -244,6 +251,7 @@ public void test_streamTaskLogs_withKubernetesPeonLifecycle() null, null, null, + null, null )); Assert.assertFalse(workItem.streamTaskLogs().isPresent()); @@ -263,6 +271,7 @@ public void test_getLocation_withKubernetesPeonLifecycle() null, null, null, + null, null )); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java index 8b8c43c0d71cb..fa0f79bc1d209 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java @@ -20,6 +20,11 @@ package org.apache.druid.k8s.overlord; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunnerListener; +import org.apache.druid.java.util.common.Pair; + +import java.util.List; +import java.util.concurrent.Executor; public class TestPeonLifecycleFactory implements PeonLifecycleFactory { @@ -31,7 +36,7 @@ public TestPeonLifecycleFactory(KubernetesPeonLifecycle kubernetesPeonLifecycle) } @Override - public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener) + public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List> listeners) { return kubernetesPeonLifecycle; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java index 2e2043578aa1e..cf9f345fd7a53 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java @@ -39,7 +39,7 @@ void testCompletionTime() .endStatus() .build(); - JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED); + JobResponse response = new JobResponse(job); Assertions.assertEquals(58000L, response.getJobDuration()); } @@ -56,7 +56,7 @@ void testNoDuration() .endStatus() .build(); - JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED); + JobResponse response = new JobResponse(job); Assertions.assertEquals(-1, response.getJobDuration()); } @@ -70,7 +70,7 @@ void testMakingCodeCoverageHappy() .endMetadata() .build(); - JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED); + JobResponse response = new JobResponse(job); Assertions.assertEquals(-1, response.getJobDuration()); } @@ -78,7 +78,7 @@ void testMakingCodeCoverageHappy() @Test void testNullJob() { - JobResponse response = new JobResponse(null, PeonPhase.SUCCEEDED); + JobResponse response = new JobResponse(null); long duration = response.getJobDuration(); Assertions.assertEquals(-1, duration); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java index f6096b675d6c2..cde7faa473bcb 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java @@ -153,7 +153,6 @@ void test_waitForPeonJobCompletion_withSuccessfulJob_returnsJobResponseWithJobAn TimeUnit.SECONDS ); - Assertions.assertEquals(PeonPhase.SUCCEEDED, jobResponse.getPhase()); Assertions.assertNotNull(jobResponse.getJob()); } @@ -178,7 +177,6 @@ void test_waitForPeonJobCompletion_withFailedJob_returnsJobResponseWithJobAndFai TimeUnit.SECONDS ); - Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase()); Assertions.assertNotNull(jobResponse.getJob()); } @@ -191,7 +189,6 @@ void test_waitforPeonJobCompletion_withoutRunningJob_returnsJobResponseWithEmpty TimeUnit.SECONDS ); - Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase()); Assertions.assertNull(jobResponse.getJob()); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java deleted file mode 100644 index 3f6bd71312be1..0000000000000 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.k8s.overlord.common; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodStatus; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class PeonPhaseTest -{ - - @Test - void testGetPhaseForToMakeCoverageHappy() - { - Pod pod = mock(Pod.class); - PodStatus status = mock(PodStatus.class); - when(status.getPhase()).thenReturn("Succeeded"); - when(pod.getStatus()).thenReturn(status); - assertEquals(PeonPhase.UNKNOWN, PeonPhase.getPhaseFor(null)); - assertEquals(PeonPhase.SUCCEEDED, PeonPhase.getPhaseFor(pod)); - } -} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java index 0981616858838..2cc5bf15c65af 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java @@ -38,13 +38,11 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; -import org.apache.druid.k8s.overlord.common.JobResponse; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.common.KubernetesClientApi; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.common.PeonCommandContext; -import org.apache.druid.k8s.overlord.common.PeonPhase; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; import org.junit.jupiter.api.BeforeEach; @@ -184,9 +182,8 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception assertEquals(task, taskFromPod); - JobResponse jobStatusResult = peonClient.waitForPeonJobCompletion(taskId, 2, TimeUnit.MINUTES); + peonClient.waitForPeonJobCompletion(taskId, 2, TimeUnit.MINUTES); thread.join(); - assertEquals(PeonPhase.SUCCEEDED, jobStatusResult.getPhase()); // as long as there were no exceptions we are good! assertEquals(expectedLogs, actualLogs); // cleanup my job diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml index 30960cdbc6685..741a032eb6c9a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml @@ -62,4 +62,4 @@ spec: ephemeral-storage: 1Gi hostname: "id-3e70afe5cd823dfc7dd308eea616426b" restartPolicy: "Never" - ttlSecondsAfterFinished: 172800 \ No newline at end of file + ttlSecondsAfterFinished: 3600 \ No newline at end of file diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml index 70b8b7c1d2428..73f31ddfb508d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml @@ -105,4 +105,4 @@ spec: name: "graveyard" - emptyDir: {} name: "kubexit" - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml index 70b8b7c1d2428..73f31ddfb508d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml @@ -105,4 +105,4 @@ spec: name: "graveyard" - emptyDir: {} name: "kubexit" - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml index 2cef837f39724..004fed9585afa 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml index cf16c49c5db1d..b6ca8a2cefe6a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml index d72d0ef37b03b..8ecdaf50b0122 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml index a230ac913a60d..547887e90847d 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml index e46de13378835..ecd9416c563a3 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml @@ -104,4 +104,4 @@ spec: name: "graveyard" - emptyDir: {} name: "kubexit" - ttlSecondsAfterFinished: 172800 + ttlSecondsAfterFinished: 3600 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml index f270368fb5524..7afc393c56afe 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml @@ -57,4 +57,4 @@ spec: cpu: "1000m" memory: "2400000000" restartPolicy: "Never" - ttlSecondsAfterFinished: 172800 \ No newline at end of file + ttlSecondsAfterFinished: 3600 \ No newline at end of file diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java index f4926864dcbec..088169d3a5372 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java @@ -23,11 +23,15 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Optional; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskRunner; +/* This class was added for mm-less ingestion in order to let the peon manage its own location lifecycle by submitting +actions to the overlord. https://github.com/apache/druid/pull/15133 moved this location logic to the overlord itself +so this Action is no longer needed. For backwards compatibility with old peons, this class was left in but can be deprecated +for a later druid release. +*/ +@Deprecated public class UpdateLocationAction implements TaskAction { @JsonIgnore @@ -58,10 +62,6 @@ public TypeReference getReturnTypeReference() @Override public Void perform(Task task, TaskActionToolbox toolbox) { - Optional taskRunner = toolbox.getTaskRunner(); - if (taskRunner.isPresent()) { - taskRunner.get().updateLocation(task, taskLocation); - } return null; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index e5b6ab1b7312f..ea0cb566b2ce2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -25,13 +25,11 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import org.apache.druid.common.utils.IdUtils; -import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.actions.UpdateLocationAction; import org.apache.druid.indexing.common.actions.UpdateStatusAction; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; @@ -42,14 +40,12 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.segment.indexing.BatchIOConfig; -import org.apache.druid.server.DruidNode; import org.joda.time.Interval; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.net.InetAddress; import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; @@ -155,11 +151,6 @@ public String setup(TaskToolbox toolbox) throws Exception FileUtils.mkdirp(attemptDir); reportsFile = new File(attemptDir, "report.json"); statusFile = new File(attemptDir, "status.json"); - InetAddress hostName = InetAddress.getLocalHost(); - DruidNode node = toolbox.getTaskExecutorNode(); - toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.create( - hostName.getHostAddress(), node.getPlaintextPort(), node.getTlsPort(), node.isEnablePlaintextPort() - ))); } log.debug("Task setup complete"); return null; @@ -211,7 +202,6 @@ public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception status = new UpdateStatusAction("failure"); } toolbox.getTaskActionClient().submit(status); - toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.unknown())); if (reportsFile != null && reportsFile.exists()) { toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index ac1fd124ef55f..99b0c05b8323e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -141,11 +141,6 @@ default void updateStatus(Task task, TaskStatus status) // do nothing } - default void updateLocation(Task task, TaskLocation location) - { - // do nothing - } - /** * The maximum number of tasks this TaskRunner can run concurrently. * Can return -1 if this method is not implemented or capacity can't be found. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java deleted file mode 100644 index 83aeb382dc0a7..0000000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.actions; - -import com.google.common.base.Optional; -import org.apache.druid.indexer.TaskLocation; -import org.apache.druid.indexing.common.task.NoopTask; -import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskRunner; -import org.junit.Test; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class UpdateLocationActionTest -{ - @Test - public void testFlow() throws UnknownHostException - { - // get my task location - InetAddress hostName = InetAddress.getLocalHost(); - TaskLocation myLocation = TaskLocation.create(hostName.getHostAddress(), 1, 2); - UpdateLocationAction action = new UpdateLocationAction(myLocation); - Task task = NoopTask.create(); - TaskActionToolbox toolbox = mock(TaskActionToolbox.class); - TaskRunner runner = mock(TaskRunner.class); - when(toolbox.getTaskRunner()).thenReturn(Optional.of(runner)); - action.perform(task, toolbox); - verify(runner, times(1)).updateLocation(eq(task), eq(myLocation)); - } - - @Test - public void testWithNoTaskRunner() throws UnknownHostException - { - // get my task location - InetAddress hostName = InetAddress.getLocalHost(); - TaskLocation myLocation = TaskLocation.create(hostName.getHostAddress(), 1, 2); - UpdateLocationAction action = new UpdateLocationAction(myLocation); - Task task = NoopTask.create(); - TaskActionToolbox toolbox = mock(TaskActionToolbox.class); - TaskRunner runner = mock(TaskRunner.class); - when(toolbox.getTaskRunner()).thenReturn(Optional.absent()); - action.perform(task, toolbox); - verify(runner, never()).updateStatus(any(), any()); - } -} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index 5bcadcfb71257..39b0bdfcfc50f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -109,7 +109,7 @@ public String setup(TaskToolbox toolbox) throws Exception task.run(toolbox); // call it 3 times, once to update location in setup, then one for status and location in cleanup - Mockito.verify(taskActionClient, times(3)).submit(any()); + Mockito.verify(taskActionClient, times(1)).submit(any()); verify(pusher, times(1)).pushTaskReports(eq("myID"), any()); verify(pusher, times(1)).pushTaskStatus(eq("myID"), any()); }