diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index ada3facb386d2..77facc5cceba0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -98,7 +98,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private final ArrayList currentBatchOfRacs; // batchOfRacsPendingProcessing holds the rows read from the frame that are pending processing. - // These get processed when we meet the criteria in processBatchOfRacsIfNeeded() method. + // These get processed when we meet the criteria in processBatchOfRacsPendingProcessingIfNeeded() method. // Until then, this accumulates rows from currentBatchOfRacs. private final ArrayList batchOfRacsPendingProcessing; @@ -181,9 +181,9 @@ This is determined by comparePartitionKeys() method. Please refer to the Javadoc of that method for further details and an example illustration. 2.1. If the PARTITION BY columns of current row matches the PARTITION BY columns of the previous row, they belong to the same PARTITION BY group, and gets added to currentBatchOfRacs. - If the number of total rows materialized exceed maxRowsMaterialized, we process the pending batch via processBatchOfRacs() method. + If the number of total rows materialized exceed maxRowsMaterialized, we process the pending batch via processBatchOfRacsPendingProcessing() method. 2.2. If they don't match, then we have reached a partition boundary. - In this case, we process the rows so far *if needed* via processBatchOfRacsIfNeeded() method. + In this case, we process the rows so far *if needed* via processBatchOfRacsPendingProcessingIfNeeded() method. Please refer to the Javadoc of that method for further details. 3. End of Input: If the input channel is finished, any remaining rows in currentBatchOfRacs are processed. @@ -245,7 +245,7 @@ Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with } else if (inputChannel.isFinished()) { // Handle any remaining data. batchOfRacsPendingProcessing.addAll(currentBatchOfRacs); - processBatchOfRacs(); + processBatchOfRacsPendingProcessing(); return ReturnOrAwait.returnObject(Unit.instance()); } else { return ReturnOrAwait.runAgain(); @@ -260,13 +260,13 @@ Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with } else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) { // Add current row to the same batch of rows for processing. currentBatchOfRacs.add(currentRow); + ensureMaxRowsInAWindowConstraint(currentBatchOfRacs.size()); if (currentBatchOfRacs.size() + batchOfRacsPendingProcessing.size() > maxRowsMaterialized) { // We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch. - processBatchOfRacs(); + processBatchOfRacsPendingProcessing(); } - ensureMaxRowsInAWindowConstraint(currentBatchOfRacs.size()); } else { - processBatchOfRacsIfNeeded(); + processBatchOfRacsPendingProcessingIfNeeded(); outputRow = currentRow.copy(); return ReturnOrAwait.runAgain(); } @@ -488,10 +488,10 @@ private void makeRowSupplierFromFrameCursor() * If we can add the current rows (currentBatchOfRacs) to the rows pending processing (batchOfRacsPendingProcessing) without violating the maxRowsMaterialized constraint, do it. * If we cannot, we process the rows pending processing (batchOfRacsPendingProcessing), and add the current rows to it which would then be processed in future. */ - private void processBatchOfRacsIfNeeded() + private void processBatchOfRacsPendingProcessingIfNeeded() { if (currentBatchOfRacs.size() + batchOfRacsPendingProcessing.size() > maxRowsMaterialized) { - processBatchOfRacs(); + processBatchOfRacsPendingProcessing(); } batchOfRacsPendingProcessing.addAll(currentBatchOfRacs); @@ -501,7 +501,7 @@ private void processBatchOfRacsIfNeeded() /** * Process {@link #batchOfRacsPendingProcessing}. */ - private void processBatchOfRacs() + private void processBatchOfRacsPendingProcessing() { if (batchOfRacsPendingProcessing.isEmpty()) { return;