From 654e0b444bfc968ba0ec65a669af6dfef98604cd Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 13 Sep 2024 15:59:41 -0700 Subject: [PATCH] MSQ: Fix two issues with phase transitions. (#17053) 1) ControllerQueryKernel: Update readyToReadResults to acknowledge that sorting stages can go directly from READING_INPUT to RESULTS_READY. 2) WorkerStageKernel: Ignore RESULTS_COMPLETE if work is already finished, which can happen if the transition to FINISHED comes early due to a downstream LIMIT. --- .../druid/msq/kernel/controller/ControllerQueryKernel.java | 7 +++---- .../apache/druid/msq/kernel/worker/WorkerStageKernel.java | 6 ++++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java index 05e0f722ccd4..b01091f9ad7a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java @@ -676,11 +676,10 @@ private boolean readyToReadResults(final StageId stageId, final ControllerStageP { if (stageOutputChannelModes.get(stageId) == OutputChannelMode.MEMORY) { if (getStageDefinition(stageId).doesSortDuringShuffle()) { - // Stages that sort during shuffle go through a READING_INPUT phase followed by a POST_READING phase - // (once all input is read). These stages start producing output once POST_READING starts. - return newPhase == ControllerStagePhase.POST_READING; + // Sorting stages start producing output when they finish reading their input. + return newPhase.isDoneReadingInput(); } else { - // Can read results immediately. + // Non-sorting stages start producing output immediately. return newPhase == ControllerStagePhase.NEW; } } else { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java index b838092ca714..992b90c02859 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java @@ -182,6 +182,12 @@ public void setResultsComplete(Object resultObject) throw new NullPointerException("resultObject must not be null"); } + if (phase.isTerminal()) { + // Ignore RESULTS_COMPLETE if work is already finished. This can happen if we transition to FINISHED early + // due to a downstream stage including a limit. + return; + } + transitionTo(WorkerStagePhase.RESULTS_COMPLETE); this.resultObject = resultObject; }