diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index fd6ae4bd6f18..eaef0cba6a15 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -20,7 +20,6 @@ package org.apache.druid.k8s.overlord; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import io.fabric8.kubernetes.api.model.Pod; @@ -32,7 +31,6 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.k8s.overlord.common.DruidK8sConstants; @@ -179,11 +177,6 @@ private void writeTaskPayload(Task task) throws IOException protected synchronized TaskStatus join(long timeout) throws IllegalStateException { try { - /* It's okay to store taskLocation because podIP only changes on pod restart, and we have to set restartPolicy to Never - since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher - if we decide we need to change this later. - **/ - taskLocation = getTaskLocationFromK8s(); updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( @@ -261,8 +254,24 @@ protected TaskLocation getTaskLocation() if we decide we need to change this later. **/ if (taskLocation == null) { - log.warn("Unknown task location for [%s]", taskId); - return TaskLocation.unknown(); + Optional maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName()); + if (!maybePod.isPresent()) { + return TaskLocation.unknown(); + } + + Pod pod = maybePod.get(); + PodStatus podStatus = pod.getStatus(); + + if (podStatus == null || podStatus.getPodIP() == null) { + return TaskLocation.unknown(); + } + taskLocation = TaskLocation.create( + podStatus.getPodIP(), + DruidK8sConstants.PORT, + DruidK8sConstants.TLS_PORT, + Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")), + pod.getMetadata() != null ? pod.getMetadata().getName() : "" + ); } return taskLocation; @@ -369,28 +378,4 @@ private void updateState(State[] acceptedStates, State targetState) ); stateListener.stateChanged(state.get(), taskId.getOriginalTaskId()); } - - @VisibleForTesting - protected TaskLocation getTaskLocationFromK8s() - { - Pod pod = kubernetesClient.getPeonPodWithRetries(taskId.getK8sJobName()); - PodStatus podStatus = pod.getStatus(); - - if (podStatus == null || podStatus.getPodIP() == null) { - throw new ISE("Could not find location of running task [%s]", taskId); - } - - return TaskLocation.create( - podStatus.getPodIP(), - DruidK8sConstants.PORT, - DruidK8sConstants.TLS_PORT, - Boolean.parseBoolean( - pod.getMetadata() != null && pod.getMetadata().getAnnotations() != null ? - pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false") : - "false" - ), - pod.getMetadata() != null ? pod.getMetadata().getName() : "" - ); - - } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 96b58edd1d34..59c3700b1fc1 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -23,7 +23,6 @@ import com.google.common.base.Optional; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; -import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; import io.fabric8.kubernetes.client.dsl.LogWatch; @@ -58,7 +57,6 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport { private static final String ID = "id"; - private static final String IP = "ip"; private static final TaskStatus SUCCESS = TaskStatus.success(ID); @Mock KubernetesPeonClient kubernetesClient; @@ -288,9 +286,6 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); - EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn( - new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build() - ); replayAll(); TaskStatus taskStatus = peonLifecycle.join(0L); @@ -342,9 +337,7 @@ public void test_join() throws IOException EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn( - new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build() - ); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -400,9 +393,7 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); - EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn( - new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build() - ).anyTimes(); + replayAll(); TaskStatus taskStatus = peonLifecycle.join(0L); @@ -454,9 +445,7 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep EasyMock.expectLastCall().once(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); - EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn( - new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build() - ); + replayAll(); TaskStatus taskStatus = peonLifecycle.join(0L); @@ -504,9 +493,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn( - new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build() - ); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -558,9 +545,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn( - new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build() - ); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -600,9 +585,7 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr EasyMock.expectLastCall().once(); logWatch.close(); EasyMock.expectLastCall(); - EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn( - new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build() - ); + Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); replayAll(); @@ -785,7 +768,7 @@ public void test_getTaskLocation_withPendingTaskState_returnsUnknown() } @Test - public void test_getTaskLocation_withRunningTaskState_taskLocationUnset_returnsUnknown() + public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnknown() throws NoSuchFieldException, IllegalAccessException { KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( @@ -797,6 +780,8 @@ public void test_getTaskLocation_withRunningTaskState_taskLocationUnset_returnsU ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()); + replayAll(); Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation()); @@ -805,7 +790,35 @@ public void test_getTaskLocation_withRunningTaskState_taskLocationUnset_returnsU } @Test - public void test_getTaskLocationFromK8s() + public void test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_returnsUnknown() + throws NoSuchFieldException, IllegalAccessException + { + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( + task, + kubernetesClient, + taskLogs, + mapper, + stateListener + ); + setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); + + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .build(); + + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)); + + replayAll(); + + Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation()); + + verifyAll(); + } + + @Test + public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_returnsLocation() throws NoSuchFieldException, IllegalAccessException { KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( @@ -826,11 +839,12 @@ public void test_getTaskLocationFromK8s() .endStatus() .build(); - EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(pod).once(); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)); replayAll(); - TaskLocation location = peonLifecycle.getTaskLocationFromK8s(); + TaskLocation location = peonLifecycle.getTaskLocation(); + Assert.assertEquals("ip", location.getHost()); Assert.assertEquals(8100, location.getPort()); Assert.assertEquals(-1, location.getTlsPort()); @@ -840,7 +854,43 @@ public void test_getTaskLocationFromK8s() } @Test - public void test_getTaskLocationFromK8s_withPeonPodWithStatusWithTLSAnnotation() + public void test_getTaskLocation_saveTaskLocation() + throws NoSuchFieldException, IllegalAccessException + { + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( + task, + kubernetesClient, + taskLogs, + mapper, + stateListener + ); + setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING); + + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(ID) + .endMetadata() + .withNewStatus() + .withPodIP("ip") + .endStatus() + .build(); + + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)).once(); + + replayAll(); + + TaskLocation location = peonLifecycle.getTaskLocation(); + peonLifecycle.getTaskLocation(); + Assert.assertEquals("ip", location.getHost()); + Assert.assertEquals(8100, location.getPort()); + Assert.assertEquals(-1, location.getTlsPort()); + Assert.assertEquals(ID, location.getK8sPodName()); + + verifyAll(); + } + + @Test + public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation() throws NoSuchFieldException, IllegalAccessException { KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( @@ -862,11 +912,11 @@ public void test_getTaskLocationFromK8s_withPeonPodWithStatusWithTLSAnnotation() .endStatus() .build(); - EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(pod).once(); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)); replayAll(); - TaskLocation location = peonLifecycle.getTaskLocationFromK8s(); + TaskLocation location = peonLifecycle.getTaskLocation(); Assert.assertEquals("ip", location.getHost()); Assert.assertEquals(-1, location.getPort()); @@ -888,6 +938,7 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown() stateListener ); setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED); + EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once(); replayAll(); Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation()); @@ -901,11 +952,4 @@ private void setPeonLifecycleState(KubernetesPeonLifecycle peonLifecycle, Kubern stateField.setAccessible(true); stateField.set(peonLifecycle, new AtomicReference<>(state)); } - - private PodStatus getPodStatusWithIP() - { - PodStatus podStatus = new PodStatus(); - podStatus.setPodIP(IP); - return podStatus; - } }