Skip to content

Commit

Permalink
Revert "always set taskLocation (#17350)" (#17417)
Browse files Browse the repository at this point in the history
This reverts commit a664fc8.
  • Loading branch information
georgew5656 authored Oct 29, 2024
1 parent 10208ba commit 66eb365
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.k8s.overlord;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.fabric8.kubernetes.api.model.Pod;
Expand All @@ -32,7 +31,6 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
Expand Down Expand Up @@ -179,11 +177,6 @@ private void writeTaskPayload(Task task) throws IOException
protected synchronized TaskStatus join(long timeout) throws IllegalStateException
{
try {
/* It's okay to store taskLocation because podIP only changes on pod restart, and we have to set restartPolicy to Never
since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher
if we decide we need to change this later.
**/
taskLocation = getTaskLocationFromK8s();
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);

JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
Expand Down Expand Up @@ -261,8 +254,24 @@ protected TaskLocation getTaskLocation()
if we decide we need to change this later.
**/
if (taskLocation == null) {
log.warn("Unknown task location for [%s]", taskId);
return TaskLocation.unknown();
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
if (!maybePod.isPresent()) {
return TaskLocation.unknown();
}

Pod pod = maybePod.get();
PodStatus podStatus = pod.getStatus();

if (podStatus == null || podStatus.getPodIP() == null) {
return TaskLocation.unknown();
}
taskLocation = TaskLocation.create(
podStatus.getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")),
pod.getMetadata() != null ? pod.getMetadata().getName() : ""
);
}

return taskLocation;
Expand Down Expand Up @@ -369,28 +378,4 @@ private void updateState(State[] acceptedStates, State targetState)
);
stateListener.stateChanged(state.get(), taskId.getOriginalTaskId());
}

@VisibleForTesting
protected TaskLocation getTaskLocationFromK8s()
{
Pod pod = kubernetesClient.getPeonPodWithRetries(taskId.getK8sJobName());
PodStatus podStatus = pod.getStatus();

if (podStatus == null || podStatus.getPodIP() == null) {
throw new ISE("Could not find location of running task [%s]", taskId);
}

return TaskLocation.create(
podStatus.getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
Boolean.parseBoolean(
pod.getMetadata() != null && pod.getMetadata().getAnnotations() != null ?
pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false") :
"false"
),
pod.getMetadata() != null ? pod.getMetadata().getName() : ""
);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.base.Optional;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.client.dsl.LogWatch;
Expand Down Expand Up @@ -58,7 +57,6 @@
public class KubernetesPeonLifecycleTest extends EasyMockSupport
{
private static final String ID = "id";
private static final String IP = "ip";
private static final TaskStatus SUCCESS = TaskStatus.success(ID);

@Mock KubernetesPeonClient kubernetesClient;
Expand Down Expand Up @@ -288,9 +286,6 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);
replayAll();

TaskStatus taskStatus = peonLifecycle.join(0L);
Expand Down Expand Up @@ -342,9 +337,7 @@ public void test_join() throws IOException
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -400,9 +393,7 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
).anyTimes();

replayAll();

TaskStatus taskStatus = peonLifecycle.join(0L);
Expand Down Expand Up @@ -454,9 +445,7 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep
EasyMock.expectLastCall().once();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);

replayAll();

TaskStatus taskStatus = peonLifecycle.join(0L);
Expand Down Expand Up @@ -504,9 +493,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -558,9 +545,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -600,9 +585,7 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatusLike(getPodStatusWithIP()).endStatus().build()
);

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -785,7 +768,7 @@ public void test_getTaskLocation_withPendingTaskState_returnsUnknown()
}

@Test
public void test_getTaskLocation_withRunningTaskState_taskLocationUnset_returnsUnknown()
public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
Expand All @@ -797,6 +780,8 @@ public void test_getTaskLocation_withRunningTaskState_taskLocationUnset_returnsU
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent());

replayAll();

Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
Expand All @@ -805,7 +790,35 @@ public void test_getTaskLocation_withRunningTaskState_taskLocationUnset_returnsU
}

@Test
public void test_getTaskLocationFromK8s()
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_returnsUnknown()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);

Pod pod = new PodBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.build();

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));

replayAll();

Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());

verifyAll();
}

@Test
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_returnsLocation()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
Expand All @@ -826,11 +839,12 @@ public void test_getTaskLocationFromK8s()
.endStatus()
.build();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(pod).once();
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));

replayAll();

TaskLocation location = peonLifecycle.getTaskLocationFromK8s();
TaskLocation location = peonLifecycle.getTaskLocation();

Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(8100, location.getPort());
Assert.assertEquals(-1, location.getTlsPort());
Expand All @@ -840,7 +854,43 @@ public void test_getTaskLocationFromK8s()
}

@Test
public void test_getTaskLocationFromK8s_withPeonPodWithStatusWithTLSAnnotation()
public void test_getTaskLocation_saveTaskLocation()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
kubernetesClient,
taskLogs,
mapper,
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);

Pod pod = new PodBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.withNewStatus()
.withPodIP("ip")
.endStatus()
.build();

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)).once();

replayAll();

TaskLocation location = peonLifecycle.getTaskLocation();
peonLifecycle.getTaskLocation();
Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(8100, location.getPort());
Assert.assertEquals(-1, location.getTlsPort());
Assert.assertEquals(ID, location.getK8sPodName());

verifyAll();
}

@Test
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
Expand All @@ -862,11 +912,11 @@ public void test_getTaskLocationFromK8s_withPeonPodWithStatusWithTLSAnnotation()
.endStatus()
.build();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(pod).once();
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));

replayAll();

TaskLocation location = peonLifecycle.getTaskLocationFromK8s();
TaskLocation location = peonLifecycle.getTaskLocation();

Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(-1, location.getPort());
Expand All @@ -888,6 +938,7 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown()
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once();

replayAll();
Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
Expand All @@ -901,11 +952,4 @@ private void setPeonLifecycleState(KubernetesPeonLifecycle peonLifecycle, Kubern
stateField.setAccessible(true);
stateField.set(peonLifecycle, new AtomicReference<>(state));
}

private PodStatus getPodStatusWithIP()
{
PodStatus podStatus = new PodStatus();
podStatus.setPodIP(IP);
return podStatus;
}
}

0 comments on commit 66eb365

Please sign in to comment.