diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java index d8f4e9d84f21..2c45a0ec7b89 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java @@ -285,4 +285,10 @@ public void updateStatus(Task task, TaskStatus status) { kubernetesTaskRunner.updateStatus(task, status); } + + @Override + public void updateLocation(Task task, TaskLocation location) + { + kubernetesTaskRunner.updateLocation(task, location); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index ea3c2a19d1c2..5c6c7c6b3ebe 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -31,9 +31,6 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskRunnerListener; -import org.apache.druid.indexing.overlord.TaskRunnerUtils; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; @@ -50,8 +47,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; -import java.util.List; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -94,8 +89,6 @@ protected enum State private final KubernetesPeonClient kubernetesClient; private final ObjectMapper mapper; private final TaskStateListener stateListener; - private final List> listeners; - @MonotonicNonNull private LogWatch logWatch; @@ -106,8 +99,7 @@ protected KubernetesPeonLifecycle( KubernetesPeonClient kubernetesClient, TaskLogs taskLogs, ObjectMapper mapper, - TaskStateListener stateListener, - List> listeners + TaskStateListener stateListener ) { this.taskId = new K8sTaskId(task); @@ -116,7 +108,6 @@ protected KubernetesPeonLifecycle( this.taskLogs = taskLogs; this.mapper = mapper; this.stateListener = stateListener; - this.listeners = listeners; } /** @@ -187,11 +178,7 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio { try { updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); - TaskRunnerUtils.notifyLocationChanged( - listeners, - task.getId(), - getTaskLocation() - ); + JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( taskId, timeout, @@ -203,14 +190,12 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio finally { try { saveLogs(); - shutdown(); } catch (Exception e) { - log.warn(e, "Cleanup failed for task [%s]", taskId); - } - finally { - stopTask(); + log.warn(e, "Log processing failed for task [%s]", taskId); } + + stopTask(); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java index 2998f3fc9212..bf4e3a712577 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleFactory.java @@ -21,14 +21,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskRunnerListener; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.tasklogs.TaskLogs; -import java.util.List; -import java.util.concurrent.Executor; - public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory { private final KubernetesPeonClient client; @@ -47,15 +42,14 @@ public KubernetesPeonLifecycleFactory( } @Override - public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List> listeners) + public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener) { return new KubernetesPeonLifecycle( task, client, taskLogs, mapper, - stateListener, - listeners + stateListener ); } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 56de37c37980..a0a29dcbbb92 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -146,20 +146,16 @@ public Optional streamTaskLog(String taskid, long offset) public ListenableFuture run(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> { - ListenableFuture unused = exec.submit(() -> runTask(task)); - return new KubernetesWorkItem(task); - }).getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))) + .getResult(); } } protected ListenableFuture joinAsync(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> { - ListenableFuture unused = exec.submit(() -> joinTask(task)); - return new KubernetesWorkItem(task); - }).getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) + .getResult(); } } @@ -176,12 +172,10 @@ private TaskStatus joinTask(Task task) @VisibleForTesting protected TaskStatus doTask(Task task, boolean run) { - TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution never started"); try { KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build( task, - this::emitTaskStateMetrics, - listeners + this::emitTaskStateMetrics ); synchronized (tasks) { @@ -194,6 +188,7 @@ protected TaskStatus doTask(Task task, boolean run) workItem.setKubernetesPeonLifecycle(peonLifecycle); } + TaskStatus taskStatus; if (run) { taskStatus = peonLifecycle.run( adapter.fromTask(task), @@ -206,17 +201,15 @@ protected TaskStatus doTask(Task task, boolean run) config.getTaskTimeout().toStandardDuration().getMillis() ); } + + updateStatus(task, taskStatus); + return taskStatus; } catch (Exception e) { log.error(e, "Task [%s] execution caught an exception", task.getId()); - taskStatus = TaskStatus.failure(task.getId(), "Could not start task execution"); throw new RuntimeException(e); } - finally { - updateStatus(task, taskStatus); - TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), TaskLocation.unknown()); - } } @VisibleForTesting @@ -249,15 +242,15 @@ protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String @Override public void updateStatus(Task task, TaskStatus status) { - KubernetesWorkItem workItem = tasks.get(task.getId()); - if (workItem != null && !workItem.getResult().isDone() && status.isComplete()) { - workItem.setResult(status); - } - - // Notify listeners even if the result is set to handle the shutdown case. TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); } + @Override + public void updateLocation(Task task, TaskLocation location) + { + TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location); + } + @Override public void shutdown(String taskid, String reason) { @@ -424,16 +417,6 @@ public void registerListener(TaskRunnerListener listener, Executor executor) final Pair listenerPair = Pair.of(listener, executor); log.debug("Registered listener [%s]", listener.getListenerId()); listeners.add(listenerPair); - - for (Map.Entry entry : tasks.entrySet()) { - if (entry.getValue().isRunning()) { - TaskRunnerUtils.notifyLocationChanged( - ImmutableList.of(listenerPair), - entry.getKey(), - entry.getValue().getLocation() - ); - } - } } @Override diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 5fdcd9cfbb71..0d67c55b30aa 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -78,7 +78,7 @@ public class KubernetesTaskRunnerConfig @JsonProperty @NotNull // how long to wait for the jobs to be cleaned up. - private Period taskCleanupDelay = new Period("PT1H"); + private Period taskCleanupDelay = new Period("P2D"); @JsonProperty @NotNull diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index b089b4dd2db0..94d4bbb67f63 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -19,10 +19,9 @@ package org.apache.druid.k8s.overlord; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -37,18 +36,9 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem private final Task task; private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; - private final SettableFuture result; - - public KubernetesWorkItem(Task task) - { - this(task, SettableFuture.create()); - } - - @VisibleForTesting - public KubernetesWorkItem(Task task, SettableFuture result) + public KubernetesWorkItem(Task task, ListenableFuture statusFuture) { - super(task.getId(), result); - this.result = result; + super(task.getId(), statusFuture); this.task = task; } @@ -61,7 +51,7 @@ protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle k protected synchronized void shutdown() { - if (this.kubernetesPeonLifecycle != null && !result.isDone()) { + if (this.kubernetesPeonLifecycle != null) { this.kubernetesPeonLifecycle.startWatchingLogs(); this.kubernetesPeonLifecycle.shutdown(); } @@ -129,9 +119,4 @@ public Task getTask() { return task; } - - public void setResult(TaskStatus status) - { - result.set(status); - } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java index 2b180fb9dac0..2a234ebc5786 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/PeonLifecycleFactory.java @@ -20,13 +20,8 @@ package org.apache.druid.k8s.overlord; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskRunnerListener; -import org.apache.druid.java.util.common.Pair; - -import java.util.List; -import java.util.concurrent.Executor; public interface PeonLifecycleFactory { - KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List> listeners); + KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java index 26fe5b98f0bd..a7a8156468f6 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/JobResponse.java @@ -32,10 +32,12 @@ public class JobResponse private static final EmittingLogger LOGGER = new EmittingLogger(JobResponse.class); private final Job job; + private final PeonPhase phase; - public JobResponse(@Nullable Job job) + public JobResponse(@Nullable Job job, PeonPhase phase) { this.job = job; + this.phase = phase; } public Job getJob() @@ -43,6 +45,11 @@ public Job getJob() return job; } + public PeonPhase getPhase() + { + return phase; + } + public long getJobDuration() { long duration = -1L; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java index 147ea732d0ca..9fdc25fa6455 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java @@ -101,13 +101,13 @@ public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, Time ); if (job == null) { log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId); - return new JobResponse(null); + return new JobResponse(null, PeonPhase.FAILED); } if (job.getStatus().getSucceeded() != null) { - return new JobResponse(job); + return new JobResponse(job, PeonPhase.SUCCEEDED); } log.warn("Task %s failed with status %s", taskId, job.getStatus()); - return new JobResponse(job); + return new JobResponse(job, PeonPhase.FAILED); }); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java new file mode 100644 index 000000000000..6efcd34872b8 --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonPhase.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum PeonPhase +{ + PENDING("Pending"), + SUCCEEDED("Succeeded"), + FAILED("Failed"), + UNKNOWN("Unknown"), + RUNNING("Running"); + + private static final Map PHASE_MAP = Arrays.stream(PeonPhase.values()) + .collect(Collectors.toMap( + PeonPhase::getPhase, + Function.identity() + )); + private final String phase; + + PeonPhase(String phase) + { + this.phase = phase; + } + + public String getPhase() + { + return phase; + } + + public static PeonPhase getPhaseFor(Pod pod) + { + if (pod == null) { + return UNKNOWN; + } + return PHASE_MAP.get(pod.getStatus().getPhase()); + } + +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java index 40ca6fc2f2b8..3ab515cc6e55 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java @@ -362,4 +362,13 @@ public void test_updateStatus() runner.updateStatus(task, TaskStatus.running(ID)); verifyAll(); } + + @Test + public void test_updateLocation() + { + kubernetesTaskRunner.updateLocation(task, TaskLocation.unknown()); + replayAll(); + runner.updateLocation(task, TaskLocation.unknown()); + verifyAll(); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index e984e449b282..1c6e429a3dc3 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -32,12 +31,11 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskRunnerListener; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.k8s.overlord.common.JobResponse; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; +import org.apache.druid.k8s.overlord.common.PeonPhase; import org.apache.druid.tasklogs.TaskLogs; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -52,8 +50,6 @@ import java.io.IOException; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -69,8 +65,6 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport @Mock LogWatch logWatch; @Mock KubernetesPeonLifecycle.TaskStateListener stateListener; - List> listeners = ImmutableList.of(); - private ObjectMapper mapper; private Task task; private K8sTaskId k8sTaskId; @@ -92,8 +86,7 @@ public void test_run() throws IOException kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ) { @Override @@ -138,8 +131,7 @@ public void test_run_useTaskManager() throws IOException kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ) { @Override @@ -183,8 +175,7 @@ public void test_run_whenCalledMultipleTimes_raisesIllegalStateException() throw kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ) { @Override @@ -233,8 +224,7 @@ public void test_run_whenExceptionRaised_setsRunnerTaskStateToNone() kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ) { @Override @@ -278,19 +268,15 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); EasyMock.expect(kubernetesClient.waitForPeonJobCompletion( EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(null)); + )).andReturn(new JobResponse(null, PeonPhase.FAILED)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); - EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); - EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -316,15 +302,12 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException @Test public void test_join() throws IOException { - Executor executor = EasyMock.mock(Executor.class); - TaskRunnerListener taskRunnerListener = EasyMock.mock(TaskRunnerListener.class); KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, kubernetesClient, taskLogs, mapper, - stateListener, - ImmutableList.of(Pair.of(taskRunnerListener, executor)) + stateListener ); Job job = new JobBuilder() @@ -342,12 +325,8 @@ public void test_join() throws IOException EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job)); + )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); - EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( - Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) - ); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of( IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8) )); @@ -359,10 +338,6 @@ public void test_join() throws IOException EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - executor.execute(EasyMock.anyObject()); - EasyMock.expectLastCall(); - taskRunnerListener.locationChanged(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -384,8 +359,7 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); Job job = new JobBuilder() @@ -401,15 +375,8 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job)); + )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); - - // Only update the location the first time, second call doesn't reach this point in the logic - EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( - Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) - ); - // Always try to delete the job - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true).times(2); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -452,8 +419,7 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); Job job = new JobBuilder() @@ -469,12 +435,8 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job)); + )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); - EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( - Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) - ); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.absent()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -507,8 +469,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); Job job = new JobBuilder() @@ -524,12 +485,8 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job)); + )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); - EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( - Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) - ); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andThrow(new IOException()); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); @@ -562,8 +519,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); Job job = new JobBuilder() @@ -579,11 +535,8 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th EasyMock.eq(k8sTaskId), EasyMock.anyLong(), EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job)); + )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); - EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( - Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) - ); EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn( Optional.of(IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)) ); @@ -596,9 +549,6 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th logWatch.close(); EasyMock.expectLastCall(); - // We should still try to cleanup the Job after - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); - Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -619,8 +569,7 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); EasyMock.expect(kubernetesClient.waitForPeonJobCompletion( @@ -629,9 +578,6 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr EasyMock.eq(TimeUnit.MILLISECONDS) )).andThrow(new RuntimeException()); - EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn( - Optional.of(new PodBuilder().withNewMetadata().withName("job-pod").endMetadata().withNewStatus().withPodIP("ip").endStatus().build()) - ); // We should still try to push logs EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); @@ -642,7 +588,7 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -662,8 +608,7 @@ public void test_shutdown_withNotStartedTaskState() kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); peonLifecycle.shutdown(); } @@ -676,8 +621,7 @@ public void test_shutdown_withPendingTaskState() throws NoSuchFieldException, Il kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING); @@ -698,8 +642,7 @@ public void test_shutdown_withRunningTaskState() throws NoSuchFieldException, Il kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -720,8 +663,7 @@ public void test_shutdown_withStoppedTaskState() throws NoSuchFieldException, Il kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); @@ -736,8 +678,7 @@ public void test_streamLogs_withNotStartedTaskState() throws NoSuchFieldExceptio kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED); @@ -752,8 +693,7 @@ public void test_streamLogs_withPendingTaskState() throws NoSuchFieldException, kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING); @@ -768,8 +708,7 @@ public void test_streamLogs_withRunningTaskState() throws NoSuchFieldException, kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -792,8 +731,7 @@ public void test_streamLogs_withStoppedTaskState() throws NoSuchFieldException, kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); @@ -809,8 +747,7 @@ public void test_getTaskLocation_withNotStartedTaskState_returnsUnknown() kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.NOT_STARTED); @@ -826,8 +763,7 @@ public void test_getTaskLocation_withPendingTaskState_returnsUnknown() kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.PENDING); @@ -843,8 +779,7 @@ public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnkn kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -866,8 +801,7 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_r kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -895,8 +829,7 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_retu kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -932,8 +865,7 @@ public void test_getTaskLocation_saveTaskLocation() kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -969,8 +901,7 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithT kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); @@ -1007,8 +938,7 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown() kubernetesClient, taskLogs, mapper, - stateListener, - listeners + stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once(); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java index 579b7539d818..1f4a7281f649 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfigTest.java @@ -47,7 +47,7 @@ public void test_deserializable() throws IOException Assert.assertNull(config.getGraceTerminationPeriodSeconds()); Assert.assertTrue(config.isDisableClientProxy()); Assert.assertEquals(new Period("PT4H"), config.getTaskTimeout()); - Assert.assertEquals(new Period("PT1H"), config.getTaskCleanupDelay()); + Assert.assertEquals(new Period("P2D"), config.getTaskCleanupDelay()); Assert.assertEquals(new Period("PT10m"), config.getTaskCleanupInterval()); Assert.assertEquals(new Period("PT1H"), config.getTaskLaunchTimeout()); Assert.assertEquals(ImmutableList.of(), config.getPeonMonitors()); @@ -72,7 +72,7 @@ public void test_builder_preservesDefaults() Assert.assertNull(config.getGraceTerminationPeriodSeconds()); Assert.assertTrue(config.isDisableClientProxy()); Assert.assertEquals(new Period("PT4H"), config.getTaskTimeout()); - Assert.assertEquals(new Period("PT1H"), config.getTaskCleanupDelay()); + Assert.assertEquals(new Period("P2D"), config.getTaskCleanupDelay()); Assert.assertEquals(new Period("PT10m"), config.getTaskCleanupInterval()); Assert.assertEquals(new Period("PT1H"), config.getTaskLaunchTimeout()); Assert.assertEquals(ImmutableList.of(), config.getPeonMonitors()); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index e04ef6300362..36a7b4cfcd9c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -28,10 +28,8 @@ import org.apache.commons.io.IOUtils; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; -import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; @@ -78,9 +76,6 @@ public class KubernetesTaskRunnerTest extends EasyMockSupport @Mock private KubernetesPeonLifecycle kubernetesPeonLifecycle; @Mock private ServiceEmitter emitter; - @Mock private Executor executor; - @Mock private TaskRunnerListener taskRunnerListener; - private KubernetesTaskRunnerConfig config; private KubernetesTaskRunner runner; private Task task; @@ -121,7 +116,11 @@ protected ListenableFuture joinAsync(Task task) { return tasks.computeIfAbsent( task.getId(), - k -> new KubernetesWorkItem(task)).getResult(); + k -> new KubernetesWorkItem( + task, + Futures.immediateFuture(TaskStatus.success(task.getId())) + ) + ).getResult(); } }; @@ -250,7 +249,7 @@ public void test_run_withExistingTask_returnsExistingWorkItem() } @Test - public void test_run_whenExceptionThrown_throwsRuntimeException() throws Exception + public void test_run_whenExceptionThrown_throwsRuntimeException() throws IOException { Job job = new JobBuilder() .withNewMetadata() @@ -270,89 +269,11 @@ public void test_run_whenExceptionThrown_throwsRuntimeException() throws Excepti replayAll(); ListenableFuture future = runner.run(task); - TaskStatus taskStatus = future.get(); - Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); - Assert.assertEquals("Could not start task execution", taskStatus.getErrorMsg()); - verifyAll(); - } - @Test - public void test_run_updateStatus() throws ExecutionException, InterruptedException - { - KubernetesTaskRunner runner = new KubernetesTaskRunner( - taskAdapter, - config, - peonClient, - httpClient, - new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter - ); + Exception e = Assert.assertThrows(ExecutionException.class, future::get); + Assert.assertTrue(e.getCause() instanceof RuntimeException); - KubernetesWorkItem workItem = new KubernetesWorkItem(task); - runner.tasks.put(task.getId(), workItem); - TaskStatus completeTaskStatus = TaskStatus.success(task.getId()); - - replayAll(); - runner.updateStatus(task, completeTaskStatus); verifyAll(); - - assertTrue(workItem.getResult().isDone()); - assertEquals(completeTaskStatus, workItem.getResult().get()); - } - - @Test - public void test_run_updateStatus_running() - { - KubernetesTaskRunner runner = new KubernetesTaskRunner( - taskAdapter, - config, - peonClient, - httpClient, - new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter - ); - KubernetesWorkItem workItem = new KubernetesWorkItem(task); - runner.tasks.put(task.getId(), workItem); - TaskStatus runningTaskStatus = TaskStatus.running(task.getId()); - - replayAll(); - runner.updateStatus(task, runningTaskStatus); - verifyAll(); - - assertFalse(workItem.getResult().isDone()); - } - - @Test - public void test_registerListener_runningTask() - { - KubernetesTaskRunner runner = new KubernetesTaskRunner( - taskAdapter, - config, - peonClient, - httpClient, - new TestPeonLifecycleFactory(kubernetesPeonLifecycle), - emitter - ); - - KubernetesPeonLifecycle runningKubernetesPeonLifecycle = EasyMock.mock(KubernetesPeonLifecycle.class); - EasyMock.expect(runningKubernetesPeonLifecycle.getState()).andReturn(KubernetesPeonLifecycle.State.RUNNING); - EasyMock.expect(runningKubernetesPeonLifecycle.getTaskLocation()).andReturn(TaskLocation.unknown()); - KubernetesWorkItem workItem = new KubernetesWorkItem(task); - workItem.setKubernetesPeonLifecycle(runningKubernetesPeonLifecycle); - runner.tasks.put(task.getId(), workItem); - - Executor executor = EasyMock.mock(Executor.class); - TaskRunnerListener taskRunnerListener = EasyMock.mock(TaskRunnerListener.class); - executor.execute(EasyMock.anyObject()); - EasyMock.expectLastCall(); - taskRunnerListener.locationChanged(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall(); - - replayAll(); - EasyMock.replay(runningKubernetesPeonLifecycle); - runner.registerListener(taskRunnerListener, executor); - verifyAll(); - EasyMock.verify(runningKubernetesPeonLifecycle); } @Test @@ -382,15 +303,16 @@ public void test_join_withExistingTask_returnsExistingWorkItem() } @Test - public void test_join_whenExceptionThrown_throwsRuntimeException() throws ExecutionException, InterruptedException + public void test_join_whenExceptionThrown_throwsRuntimeException() { EasyMock.expect(kubernetesPeonLifecycle.join(EasyMock.anyLong())).andThrow(new IllegalStateException()); replayAll(); ListenableFuture future = runner.joinAsync(task); - TaskStatus taskStatus = future.get(); - Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); + + Exception e = Assert.assertThrows(ExecutionException.class, future::get); + Assert.assertTrue(e.getCause() instanceof RuntimeException); verifyAll(); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index f2f398658e05..7d17193b1714 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -56,7 +56,6 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() null, null, null, - null, null )); @@ -67,7 +66,6 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() null, null, null, - null, null )) ); @@ -82,8 +80,6 @@ public void test_shutdown_withoutKubernetesPeonLifecycle() @Test public void test_shutdown_withKubernetesPeonLifecycle() { - KubernetesWorkItem workItem = new KubernetesWorkItem(task); - kubernetesPeonLifecycle.shutdown(); EasyMock.expectLastCall(); kubernetesPeonLifecycle.startWatchingLogs(); @@ -91,6 +87,7 @@ public void test_shutdown_withKubernetesPeonLifecycle() replayAll(); workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); + workItem.shutdown(); verifyAll(); } @@ -161,7 +158,6 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_returnsPending() null, null, null, - null, null )); @@ -176,7 +172,6 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_inPendingState_r null, null, null, - null, null ) { @Override @@ -199,7 +194,6 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_inRunningState_r null, null, null, - null, null ) { @Override @@ -222,7 +216,6 @@ public void test_getRunnerTaskState_withKubernetesPeonLifecycle_inStoppedState_r null, null, null, - null, null ) { @Override @@ -251,7 +244,6 @@ public void test_streamTaskLogs_withKubernetesPeonLifecycle() null, null, null, - null, null )); Assert.assertFalse(workItem.streamTaskLogs().isPresent()); @@ -271,7 +263,6 @@ public void test_getLocation_withKubernetesPeonLifecycle() null, null, null, - null, null )); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java index fa0f79bc1d20..8b8c43c0d71c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/TestPeonLifecycleFactory.java @@ -20,11 +20,6 @@ package org.apache.druid.k8s.overlord; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskRunnerListener; -import org.apache.druid.java.util.common.Pair; - -import java.util.List; -import java.util.concurrent.Executor; public class TestPeonLifecycleFactory implements PeonLifecycleFactory { @@ -36,7 +31,7 @@ public TestPeonLifecycleFactory(KubernetesPeonLifecycle kubernetesPeonLifecycle) } @Override - public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List> listeners) + public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener) { return kubernetesPeonLifecycle; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java index cf9f345fd7a5..2e2043578aa1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/JobResponseTest.java @@ -39,7 +39,7 @@ void testCompletionTime() .endStatus() .build(); - JobResponse response = new JobResponse(job); + JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED); Assertions.assertEquals(58000L, response.getJobDuration()); } @@ -56,7 +56,7 @@ void testNoDuration() .endStatus() .build(); - JobResponse response = new JobResponse(job); + JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED); Assertions.assertEquals(-1, response.getJobDuration()); } @@ -70,7 +70,7 @@ void testMakingCodeCoverageHappy() .endMetadata() .build(); - JobResponse response = new JobResponse(job); + JobResponse response = new JobResponse(job, PeonPhase.SUCCEEDED); Assertions.assertEquals(-1, response.getJobDuration()); } @@ -78,7 +78,7 @@ void testMakingCodeCoverageHappy() @Test void testNullJob() { - JobResponse response = new JobResponse(null); + JobResponse response = new JobResponse(null, PeonPhase.SUCCEEDED); long duration = response.getJobDuration(); Assertions.assertEquals(-1, duration); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java index cde7faa473bc..f6096b675d6c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java @@ -153,6 +153,7 @@ void test_waitForPeonJobCompletion_withSuccessfulJob_returnsJobResponseWithJobAn TimeUnit.SECONDS ); + Assertions.assertEquals(PeonPhase.SUCCEEDED, jobResponse.getPhase()); Assertions.assertNotNull(jobResponse.getJob()); } @@ -177,6 +178,7 @@ void test_waitForPeonJobCompletion_withFailedJob_returnsJobResponseWithJobAndFai TimeUnit.SECONDS ); + Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase()); Assertions.assertNotNull(jobResponse.getJob()); } @@ -189,6 +191,7 @@ void test_waitforPeonJobCompletion_withoutRunningJob_returnsJobResponseWithEmpty TimeUnit.SECONDS ); + Assertions.assertEquals(PeonPhase.FAILED, jobResponse.getPhase()); Assertions.assertNull(jobResponse.getJob()); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java new file mode 100644 index 000000000000..3f6bd71312be --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/PeonPhaseTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodStatus; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PeonPhaseTest +{ + + @Test + void testGetPhaseForToMakeCoverageHappy() + { + Pod pod = mock(Pod.class); + PodStatus status = mock(PodStatus.class); + when(status.getPhase()).thenReturn("Succeeded"); + when(pod.getStatus()).thenReturn(status); + assertEquals(PeonPhase.UNKNOWN, PeonPhase.getPhaseFor(null)); + assertEquals(PeonPhase.SUCCEEDED, PeonPhase.getPhaseFor(pod)); + } +} diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java index 2cc5bf15c65a..098161685883 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java @@ -38,11 +38,13 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; +import org.apache.druid.k8s.overlord.common.JobResponse; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.K8sTestUtils; import org.apache.druid.k8s.overlord.common.KubernetesClientApi; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.common.PeonCommandContext; +import org.apache.druid.k8s.overlord.common.PeonPhase; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; import org.junit.jupiter.api.BeforeEach; @@ -182,8 +184,9 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception assertEquals(task, taskFromPod); - peonClient.waitForPeonJobCompletion(taskId, 2, TimeUnit.MINUTES); + JobResponse jobStatusResult = peonClient.waitForPeonJobCompletion(taskId, 2, TimeUnit.MINUTES); thread.join(); + assertEquals(PeonPhase.SUCCEEDED, jobStatusResult.getPhase()); // as long as there were no exceptions we are good! assertEquals(expectedLogs, actualLogs); // cleanup my job diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml index 741a032eb6c9..30960cdbc668 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedEphemeralOutput.yaml @@ -62,4 +62,4 @@ spec: ephemeral-storage: 1Gi hostname: "id-3e70afe5cd823dfc7dd308eea616426b" restartPolicy: "Never" - ttlSecondsAfterFinished: 3600 \ No newline at end of file + ttlSecondsAfterFinished: 172800 \ No newline at end of file diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml index 73f31ddfb508..70b8b7c1d242 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutput.yaml @@ -105,4 +105,4 @@ spec: name: "graveyard" - emptyDir: {} name: "kubexit" - ttlSecondsAfterFinished: 3600 + ttlSecondsAfterFinished: 172800 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml index 73f31ddfb508..70b8b7c1d242 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedMultiContainerOutputOrder.yaml @@ -105,4 +105,4 @@ spec: name: "graveyard" - emptyDir: {} name: "kubexit" - ttlSecondsAfterFinished: 3600 + ttlSecondsAfterFinished: 172800 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml index 004fed9585af..2cef837f3972 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 3600 + ttlSecondsAfterFinished: 172800 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml index b6ca8a2cefe6..cf16c49c5db1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 3600 + ttlSecondsAfterFinished: 172800 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml index 8ecdaf50b012..d72d0ef37b03 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 3600 + ttlSecondsAfterFinished: 172800 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml index 547887e90847..a230ac913a60 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml @@ -16,7 +16,7 @@ metadata: spec: activeDeadlineSeconds: 14400 backoffLimit: 0 - ttlSecondsAfterFinished: 3600 + ttlSecondsAfterFinished: 172800 template: metadata: labels: diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml index ecd9416c563a..e46de1337883 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedPodSpec.yaml @@ -104,4 +104,4 @@ spec: name: "graveyard" - emptyDir: {} name: "kubexit" - ttlSecondsAfterFinished: 3600 + ttlSecondsAfterFinished: 172800 diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml index 7afc393c56af..f270368fb552 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedSingleContainerOutput.yaml @@ -57,4 +57,4 @@ spec: cpu: "1000m" memory: "2400000000" restartPolicy: "Never" - ttlSecondsAfterFinished: 3600 \ No newline at end of file + ttlSecondsAfterFinished: 172800 \ No newline at end of file diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java index 088169d3a537..f4926864dcbe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java @@ -23,15 +23,11 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Optional; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunner; -/* This class was added for mm-less ingestion in order to let the peon manage its own location lifecycle by submitting -actions to the overlord. https://github.com/apache/druid/pull/15133 moved this location logic to the overlord itself -so this Action is no longer needed. For backwards compatibility with old peons, this class was left in but can be deprecated -for a later druid release. -*/ -@Deprecated public class UpdateLocationAction implements TaskAction { @JsonIgnore @@ -62,6 +58,10 @@ public TypeReference getReturnTypeReference() @Override public Void perform(Task task, TaskActionToolbox toolbox) { + Optional taskRunner = toolbox.getTaskRunner(); + if (taskRunner.isPresent()) { + taskRunner.get().updateLocation(task, taskLocation); + } return null; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index cd17160f3aae..e8770c512dc4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -25,11 +25,13 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.UpdateLocationAction; import org.apache.druid.indexing.common.actions.UpdateStatusAction; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; @@ -40,12 +42,14 @@ import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.segment.indexing.BatchIOConfig; +import org.apache.druid.server.DruidNode; import org.joda.time.Interval; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; @@ -151,6 +155,11 @@ public String setup(TaskToolbox toolbox) throws Exception FileUtils.mkdirp(attemptDir); reportsFile = new File(attemptDir, "report.json"); statusFile = new File(attemptDir, "status.json"); + InetAddress hostName = InetAddress.getLocalHost(); + DruidNode node = toolbox.getTaskExecutorNode(); + toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.create( + hostName.getHostAddress(), node.getPlaintextPort(), node.getTlsPort(), node.isEnablePlaintextPort() + ))); } log.debug("Task setup complete"); return null; @@ -203,6 +212,7 @@ public void cleanUp(TaskToolbox toolbox, @Nullable TaskStatus taskStatus) throws // report back to the overlord UpdateStatusAction status = new UpdateStatusAction("", taskStatusToReport); toolbox.getTaskActionClient().submit(status); + toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.unknown())); if (reportsFile != null && reportsFile.exists()) { toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index 99b0c05b8323..ac1fd124ef55 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -141,6 +141,11 @@ default void updateStatus(Task task, TaskStatus status) // do nothing } + default void updateLocation(Task task, TaskLocation location) + { + // do nothing + } + /** * The maximum number of tasks this TaskRunner can run concurrently. * Can return -1 if this method is not implemented or capacity can't be found. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java new file mode 100644 index 000000000000..83aeb382dc0a --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/UpdateLocationActionTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.google.common.base.Optional; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.TaskRunner; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class UpdateLocationActionTest +{ + @Test + public void testFlow() throws UnknownHostException + { + // get my task location + InetAddress hostName = InetAddress.getLocalHost(); + TaskLocation myLocation = TaskLocation.create(hostName.getHostAddress(), 1, 2); + UpdateLocationAction action = new UpdateLocationAction(myLocation); + Task task = NoopTask.create(); + TaskActionToolbox toolbox = mock(TaskActionToolbox.class); + TaskRunner runner = mock(TaskRunner.class); + when(toolbox.getTaskRunner()).thenReturn(Optional.of(runner)); + action.perform(task, toolbox); + verify(runner, times(1)).updateLocation(eq(task), eq(myLocation)); + } + + @Test + public void testWithNoTaskRunner() throws UnknownHostException + { + // get my task location + InetAddress hostName = InetAddress.getLocalHost(); + TaskLocation myLocation = TaskLocation.create(hostName.getHostAddress(), 1, 2); + UpdateLocationAction action = new UpdateLocationAction(myLocation); + Task task = NoopTask.create(); + TaskActionToolbox toolbox = mock(TaskActionToolbox.class); + TaskRunner runner = mock(TaskRunner.class); + when(toolbox.getTaskRunner()).thenReturn(Optional.absent()); + action.perform(task, toolbox); + verify(runner, never()).updateStatus(any(), any()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index bcd2f086fd06..ba210fee228a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -109,7 +109,7 @@ public String setup(TaskToolbox toolbox) throws Exception task.run(toolbox); // call it 3 times, once to update location in setup, then one for status and location in cleanup - Mockito.verify(taskActionClient, times(1)).submit(any()); + Mockito.verify(taskActionClient, times(3)).submit(any()); verify(pusher, times(1)).pushTaskReports(eq("myID"), any()); verify(pusher, times(1)).pushTaskStatus(eq("myID"), any()); }