From 204533cade64bad591b31918ce1ad2b7de03b2c2 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 14 Aug 2024 10:22:19 +0530 Subject: [PATCH] Remove Query ID verification check from MSQ workers (#16886) Upgrade/Downgrade between any version till or before Druid 30 where the newer version runs a worker task, while the older version runs a controller task can fail. The patch removes that verification check till its safe to add it back. --- .../org/apache/druid/msq/exec/WorkerImpl.java | 17 ++++------------- .../org/apache/druid/msq/kernel/StageId.java | 3 +++ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 7d2964eb2f8c..912826c5c5a5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -1100,7 +1100,7 @@ public static KernelHolders create(final WorkerContext workerContext, final Clos */ public void addKernel(final WorkerStageKernel kernel) { - final StageId stageId = verifyQueryId(kernel.getWorkOrder().getStageDefinition().getId()); + final StageId stageId = kernel.getWorkOrder().getStageDefinition().getId(); if (holderMap.putIfAbsent(stageId.getStageNumber(), new KernelHolder(kernel)) != null) { // Already added. Do nothing. @@ -1116,7 +1116,7 @@ public void addKernel(final WorkerStageKernel kernel) */ public void finishProcessing(final StageId stageId) { - final KernelHolder kernel = holderMap.get(verifyQueryId(stageId).getStageNumber()); + final KernelHolder kernel = holderMap.get(stageId.getStageNumber()); if (kernel != null) { try { @@ -1137,7 +1137,7 @@ public void finishProcessing(final StageId stageId) */ public void removeKernel(final StageId stageId) { - final KernelHolder removed = holderMap.remove(verifyQueryId(stageId).getStageNumber()); + final KernelHolder removed = holderMap.remove(stageId.getStageNumber()); if (removed == null) { throw new ISE("No kernel for stage[%s]", stageId); @@ -1191,7 +1191,7 @@ public int runningKernelCount() @Nullable public WorkerStageKernel getKernelFor(final StageId stageId) { - final KernelHolder holder = holderMap.get(verifyQueryId(stageId).getStageNumber()); + final KernelHolder holder = holderMap.get(stageId.getStageNumber()); if (holder != null) { return holder.kernel; } else { @@ -1240,15 +1240,6 @@ public void setDone() { this.done = true; } - - private StageId verifyQueryId(final StageId stageId) - { - if (!stageId.getQueryId().equals(workerContext.queryId())) { - throw new ISE("Unexpected queryId[%s], expected queryId[%s]", stageId.getQueryId(), workerContext.queryId()); - } - - return stageId; - } } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageId.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageId.java index 5b98eed0da95..a928e5834fc4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageId.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageId.java @@ -31,6 +31,9 @@ /** * Globally unique stage identifier: query ID plus stage number. + * + * Note: Versions till Druid 30 had a bug in the QueryKits which populated the {@link #queryId} field with random + * UUIDs. Therefore, all usage of the field must be vetted instead of assuming that it will be the expected query id */ public class StageId implements Comparable {