Skip to content

Commit

Permalink
Ensure maxRowsMaterialized + Refactoring + Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshat-Jain committed Aug 21, 2024
1 parent 4cd6ff2 commit 8954758
Showing 1 changed file with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final ArrayList<ResultRow> currentBatchOfRacs;

// batchOfRacsPendingProcessing holds the rows read from the frame that are pending processing.
// These get processed when we meet the criteria in processRacsIfNeeded() method.
// These get processed when we meet the criteria in processBatchOfRacsIfNeeded() method.
// Until then, this accumulates rows from currentBatchOfRacs.
private final ArrayList<ResultRow> batchOfRacsPendingProcessing;

Expand Down Expand Up @@ -160,9 +160,15 @@ This is done because we anyway need to run the operators on the entire set of ro
The flow for this scenario can be summarised as following:
1. Frame Reading and Cursor Initialization: We start by reading a frame from the inputChannel and initializing frameCursor to iterate over the rows in that frame.
2. Row Comparison: For each row in the frame, we decide whether it belongs to the same PARTITION BY group as the previous row. This is determined by comparePartitionKeys() method. Please refer to the Javadoc of that method for further details and an example illustration.
2.1. If the PARTITION BY columns of current row matches the PARTITION BY columns of the previous row, they belong to the same PARTITION BY group, and gets added to currentBatchOfRacs.
2.2. If they don't match, then we have reached a partition boundary. In this case, we process the rows so far *if needed* via processRacsIfNeeded() method. Please refer to the Javadoc of that method for further details.
2. Row Comparison: For each row in the frame, we decide whether it belongs to the same PARTITION BY group as the previous row.
This is determined by comparePartitionKeys() method.
Please refer to the Javadoc of that method for further details and an example illustration.
2.1. If the PARTITION BY columns of current row matches the PARTITION BY columns of the previous row,
they belong to the same PARTITION BY group, and gets added to currentBatchOfRacs.
If the number of total rows materialized exceed maxRowsMaterialized, we process the pending batch via processBatchOfRacs() method.
2.2. If they don't match, then we have reached a partition boundary.
In this case, we process the rows so far *if needed* via processBatchOfRacsIfNeeded() method.
Please refer to the Javadoc of that method for further details.
3. End of Input: If the input channel is finished, any remaining rows in currentBatchOfRacs are processed.
*Illustration of Row Comparison step*
Expand Down Expand Up @@ -223,9 +229,7 @@ Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with
} else if (inputChannel.isFinished()) {
// Handle any remaining data.
batchOfRacsPendingProcessing.addAll(currentBatchOfRacs);
if (!batchOfRacsPendingProcessing.isEmpty()) {
runAllOpsOnSingleRac();
}
processBatchOfRacs();
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.runAgain();
Expand All @@ -240,9 +244,13 @@ Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// Add current row to the same batch of rows for processing.
currentBatchOfRacs.add(currentRow);
if (currentBatchOfRacs.size() + batchOfRacsPendingProcessing.size() > maxRowsMaterialized) {
// We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch.
processBatchOfRacs();
}
ensureMaxRowsInAWindowConstraint(currentBatchOfRacs.size());
} else {
processRacsIfNeeded();
processBatchOfRacsIfNeeded();
outputRow = currentRow.copy();
return ReturnOrAwait.runAgain();
}
Expand All @@ -251,29 +259,6 @@ Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with
return ReturnOrAwait.runAgain();
}

private void runAllOpsOnSingleRac()
{
ensureMaxRowsInAWindowConstraint(batchOfRacsPendingProcessing.size());
RowsAndColumns singleRac = MapOfColumnsRowsAndColumns.fromResultRow(
batchOfRacsPendingProcessing,
frameReader.signature()
);
Operator op = new Operator()
{
@Nullable
@Override
public Closeable goOrContinue(Closeable continuationObject, Receiver receiver)
{
ensureMaxRowsInAWindowConstraint(singleRac.numRows());
receiver.push(singleRac);
receiver.completed();
return null;
}
};
runOperatorsAfterThis(op);
batchOfRacsPendingProcessing.clear();
}

/**
* @param listOfRacs Concat this list of {@link RowsAndColumns} to a {@link ConcatRowsAndColumns} to use as a single input for the operators to be run
*/
Expand Down Expand Up @@ -484,16 +469,34 @@ private void makeRowSupplierFromFrameCursor()
* If we can add the current rows (currentBatchOfRacs) to the rows pending processing (batchOfRacsPendingProcessing) without violating the maxRowsMaterialized constraint, do it.
* If we cannot, we process the rows pending processing (batchOfRacsPendingProcessing), and add the current rows to it which would then be processed in future.
*/
private void processRacsIfNeeded()
private void processBatchOfRacsIfNeeded()
{
if (currentBatchOfRacs.size() + batchOfRacsPendingProcessing.size() > maxRowsMaterialized) {
runAllOpsOnSingleRac();
processBatchOfRacs();
}

batchOfRacsPendingProcessing.addAll(currentBatchOfRacs);
currentBatchOfRacs.clear();
}

/**
* Process {@link #batchOfRacsPendingProcessing}.
*/
private void processBatchOfRacs()
{
if (batchOfRacsPendingProcessing.isEmpty()) {
return;
}
RowsAndColumns singleRac = MapOfColumnsRowsAndColumns.fromResultRow(
batchOfRacsPendingProcessing,
frameReader.signature()
);
ArrayList<RowsAndColumns> rowsAndColumns = new ArrayList<>();
rowsAndColumns.add(singleRac);
runAllOpsOnMultipleRac(rowsAndColumns);
batchOfRacsPendingProcessing.clear();
}

private void ensureMaxRowsInAWindowConstraint(int numRowsInWindow)
{
if (numRowsInWindow > maxRowsMaterialized) {
Expand Down

0 comments on commit 8954758

Please sign in to comment.