Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into windowing-fixes-str…
Browse files Browse the repository at this point in the history
…ing-min-max
  • Loading branch information
kgyrtkirk committed Oct 20, 2023
2 parents abe5c06 + fbbb9c7 commit 5257d4a
Show file tree
Hide file tree
Showing 136 changed files with 3,272 additions and 1,205 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/cron-job-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ jobs:
- name: Checkout branch
uses: actions/checkout@v3

- name: Setup java
run: export JAVA_HOME=$JAVA_HOME_8_X64
- name: setup java
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'zulu'

- name: Cache Maven m2 repository
id: maven
Expand Down
10 changes: 6 additions & 4 deletions .github/workflows/reusable-revised-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ jobs:
- name: Checkout branch
uses: actions/checkout@v3

- name: Setup java
run: |
echo "JAVA_HOME=$JAVA_HOME_${{ inputs.build_jdk }}_X64" >> $GITHUB_ENV
- name: setup java
uses: actions/setup-java@v3
with:
java-version: ${{ inputs.build_jdk }}
distribution: 'zulu'

- name: Restore Maven repository
id: maven-restore
Expand Down Expand Up @@ -158,7 +160,7 @@ jobs:
- name: Collect service logs on failure
if: ${{ failure() && steps.run-it.conclusion == 'failure' }}
run: |
tar cvzf ./service-logs.tgz ~/shared/logs
tar cvzf ./service-logs.tgz integration-tests-ex/cases/target/${{ inputs.it }}/logs
- name: Upload Druid service logs to GitHub
if: ${{ failure() && steps.run-it.conclusion == 'failure' }}
Expand Down
8 changes: 5 additions & 3 deletions .github/workflows/reusable-standard-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ jobs:
- name: Checkout branch
uses: actions/checkout@v3

- name: Setup java
run: |
echo "JAVA_HOME=$JAVA_HOME_${{ inputs.runtime_jdk }}_X64" >> $GITHUB_ENV
- name: setup java
uses: actions/setup-java@v3
with:
java-version: ${{ inputs.runtime_jdk }}
distribution: 'zulu'

- name: Restore Maven repository
id: maven-restore
Expand Down
9 changes: 6 additions & 3 deletions .github/workflows/standard-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ jobs:
strategy:
fail-fast: false
matrix:
jdk: [8, 17]
jdk: [8, 17, 21]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
Expand Down Expand Up @@ -150,8 +150,11 @@ jobs:
- name: Checkout branch
uses: actions/checkout@v3

- name: Setup java
run: export JAVA_HOME=$JAVA_HOME_8_X64
- name: setup java
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'zulu'

# the build step produces SNAPSHOT artifacts into the local maven repository,
# we include github.sha in the cache key to make it specific to that build/jdk
Expand Down
2 changes: 1 addition & 1 deletion examples/bin/run-java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ if [ -z "$JAVA_BIN" ]; then
exit 1
fi

JAVA_MAJOR="$("$JAVA_BIN" -version 2>&1 | sed -n -E 's/.* version "([^."]*).*/\1/p')"
JAVA_MAJOR="$("$JAVA_BIN" -version 2>&1 | sed -n -E 's/.* version "([^."-]*).*/\1/p')"

if [ "$JAVA_MAJOR" != "" ] && [ "$JAVA_MAJOR" -ge "11" ]
then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,4 @@ public void updateStatus(Task task, TaskStatus status)
{
kubernetesTaskRunner.updateStatus(task, status);
}

@Override
public void updateLocation(Task task, TaskLocation location)
{
kubernetesTaskRunner.updateLocation(task, location);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerUtils;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
Expand All @@ -47,6 +50,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -89,6 +94,8 @@ protected enum State
private final KubernetesPeonClient kubernetesClient;
private final ObjectMapper mapper;
private final TaskStateListener stateListener;
private final List<Pair<TaskRunnerListener, Executor>> listeners;

@MonotonicNonNull
private LogWatch logWatch;

Expand All @@ -99,7 +106,8 @@ protected KubernetesPeonLifecycle(
KubernetesPeonClient kubernetesClient,
TaskLogs taskLogs,
ObjectMapper mapper,
TaskStateListener stateListener
TaskStateListener stateListener,
List<Pair<TaskRunnerListener, Executor>> listeners
)
{
this.taskId = new K8sTaskId(task);
Expand All @@ -108,6 +116,7 @@ protected KubernetesPeonLifecycle(
this.taskLogs = taskLogs;
this.mapper = mapper;
this.stateListener = stateListener;
this.listeners = listeners;
}

/**
Expand Down Expand Up @@ -178,7 +187,11 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
{
try {
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);

TaskRunnerUtils.notifyLocationChanged(
listeners,
task.getId(),
getTaskLocation()
);
JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
timeout,
Expand All @@ -190,12 +203,14 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
finally {
try {
saveLogs();
shutdown();
}
catch (Exception e) {
log.warn(e, "Log processing failed for task [%s]", taskId);
log.warn(e, "Cleanup failed for task [%s]", taskId);
}
finally {
stopTask();
}

stopTask();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs;

import java.util.List;
import java.util.concurrent.Executor;

public class KubernetesPeonLifecycleFactory implements PeonLifecycleFactory
{
private final KubernetesPeonClient client;
Expand All @@ -42,14 +47,15 @@ public KubernetesPeonLifecycleFactory(
}

@Override
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List<Pair<TaskRunnerListener, Executor>> listeners)
{
return new KubernetesPeonLifecycle(
task,
client,
taskLogs,
mapper,
stateListener
stateListener,
listeners
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,20 @@ public Optional<InputStream> streamTaskLog(String taskid, long offset)
public ListenableFuture<TaskStatus> run(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> {
ListenableFuture<TaskStatus> unused = exec.submit(() -> runTask(task));
return new KubernetesWorkItem(task);
}).getResult();
}
}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> {
ListenableFuture<TaskStatus> unused = exec.submit(() -> joinTask(task));
return new KubernetesWorkItem(task);
}).getResult();
}
}

Expand All @@ -172,10 +176,12 @@ private TaskStatus joinTask(Task task)
@VisibleForTesting
protected TaskStatus doTask(Task task, boolean run)
{
TaskStatus taskStatus = TaskStatus.failure(task.getId(), "Task execution never started");
try {
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(
task,
this::emitTaskStateMetrics
this::emitTaskStateMetrics,
listeners
);

synchronized (tasks) {
Expand All @@ -188,7 +194,6 @@ protected TaskStatus doTask(Task task, boolean run)
workItem.setKubernetesPeonLifecycle(peonLifecycle);
}

TaskStatus taskStatus;
if (run) {
taskStatus = peonLifecycle.run(
adapter.fromTask(task),
Expand All @@ -201,15 +206,17 @@ protected TaskStatus doTask(Task task, boolean run)
config.getTaskTimeout().toStandardDuration().getMillis()
);
}

updateStatus(task, taskStatus);

return taskStatus;
}
catch (Exception e) {
log.error(e, "Task [%s] execution caught an exception", task.getId());
taskStatus = TaskStatus.failure(task.getId(), "Could not start task execution");
throw new RuntimeException(e);
}
finally {
updateStatus(task, taskStatus);
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), TaskLocation.unknown());
}
}

@VisibleForTesting
Expand Down Expand Up @@ -242,13 +249,13 @@ protected void emitTaskStateMetrics(KubernetesPeonLifecycle.State state, String
@Override
public void updateStatus(Task task, TaskStatus status)
{
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}
KubernetesWorkItem workItem = tasks.get(task.getId());
if (workItem != null && !workItem.getResult().isDone() && status.isComplete()) {
workItem.setResult(status);
}

@Override
public void updateLocation(Task task, TaskLocation location)
{
TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), location);
// Notify listeners even if the result is set to handle the shutdown case.
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status);
}

@Override
Expand Down Expand Up @@ -417,6 +424,16 @@ public void registerListener(TaskRunnerListener listener, Executor executor)
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);
log.debug("Registered listener [%s]", listener.getListenerId());
listeners.add(listenerPair);

for (Map.Entry<String, KubernetesWorkItem> entry : tasks.entrySet()) {
if (entry.getValue().isRunning()) {
TaskRunnerUtils.notifyLocationChanged(
ImmutableList.of(listenerPair),
entry.getKey(),
entry.getValue().getLocation()
);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class KubernetesTaskRunnerConfig
@JsonProperty
@NotNull
// how long to wait for the jobs to be cleaned up.
private Period taskCleanupDelay = new Period("P2D");
private Period taskCleanupDelay = new Period("PT1H");

@JsonProperty
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

package org.apache.druid.k8s.overlord;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -36,9 +37,18 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem
private final Task task;
private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;

public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture)
private final SettableFuture<TaskStatus> result;

public KubernetesWorkItem(Task task)
{
this(task, SettableFuture.create());
}

@VisibleForTesting
public KubernetesWorkItem(Task task, SettableFuture<TaskStatus> result)
{
super(task.getId(), statusFuture);
super(task.getId(), result);
this.result = result;
this.task = task;
}

Expand All @@ -51,7 +61,7 @@ protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle k
protected synchronized void shutdown()
{

if (this.kubernetesPeonLifecycle != null) {
if (this.kubernetesPeonLifecycle != null && !result.isDone()) {
this.kubernetesPeonLifecycle.startWatchingLogs();
this.kubernetesPeonLifecycle.shutdown();
}
Expand Down Expand Up @@ -119,4 +129,9 @@ public Task getTask()
{
return task;
}

public void setResult(TaskStatus status)
{
result.set(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
package org.apache.druid.k8s.overlord;

import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.java.util.common.Pair;

import java.util.List;
import java.util.concurrent.Executor;

public interface PeonLifecycleFactory
{
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener);
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener, List<Pair<TaskRunnerListener, Executor>> listeners);
}
Loading

0 comments on commit 5257d4a

Please sign in to comment.