Skip to content

Commit

Permalink
Rebase with master + Rename method
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshat-Jain committed Aug 21, 2024
1 parent 22e6e42 commit cca4904
Showing 1 changed file with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final ArrayList<ResultRow> currentBatchOfRacs;

// batchOfRacsPendingProcessing holds the rows read from the frame that are pending processing.
// These get processed when we meet the criteria in processBatchOfRacsIfNeeded() method.
// These get processed when we meet the criteria in processBatchOfRacsPendingProcessingIfNeeded() method.
// Until then, this accumulates rows from currentBatchOfRacs.
private final ArrayList<ResultRow> batchOfRacsPendingProcessing;

Expand Down Expand Up @@ -181,9 +181,9 @@ This is determined by comparePartitionKeys() method.
Please refer to the Javadoc of that method for further details and an example illustration.
2.1. If the PARTITION BY columns of current row matches the PARTITION BY columns of the previous row,
they belong to the same PARTITION BY group, and gets added to currentBatchOfRacs.
If the number of total rows materialized exceed maxRowsMaterialized, we process the pending batch via processBatchOfRacs() method.
If the number of total rows materialized exceed maxRowsMaterialized, we process the pending batch via processBatchOfRacsPendingProcessing() method.
2.2. If they don't match, then we have reached a partition boundary.
In this case, we process the rows so far *if needed* via processBatchOfRacsIfNeeded() method.
In this case, we process the rows so far *if needed* via processBatchOfRacsPendingProcessingIfNeeded() method.
Please refer to the Javadoc of that method for further details.
3. End of Input: If the input channel is finished, any remaining rows in currentBatchOfRacs are processed.
Expand Down Expand Up @@ -245,7 +245,7 @@ Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with
} else if (inputChannel.isFinished()) {
// Handle any remaining data.
batchOfRacsPendingProcessing.addAll(currentBatchOfRacs);
processBatchOfRacs();
processBatchOfRacsPendingProcessing();
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.runAgain();
Expand All @@ -260,13 +260,13 @@ Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// Add current row to the same batch of rows for processing.
currentBatchOfRacs.add(currentRow);
ensureMaxRowsInAWindowConstraint(currentBatchOfRacs.size());
if (currentBatchOfRacs.size() + batchOfRacsPendingProcessing.size() > maxRowsMaterialized) {
// We don't want to materialize more than maxRowsMaterialized rows at any point in time, so process the pending batch.
processBatchOfRacs();
processBatchOfRacsPendingProcessing();
}
ensureMaxRowsInAWindowConstraint(currentBatchOfRacs.size());
} else {
processBatchOfRacsIfNeeded();
processBatchOfRacsPendingProcessingIfNeeded();
outputRow = currentRow.copy();
return ReturnOrAwait.runAgain();
}
Expand Down Expand Up @@ -488,10 +488,10 @@ private void makeRowSupplierFromFrameCursor()
* If we can add the current rows (currentBatchOfRacs) to the rows pending processing (batchOfRacsPendingProcessing) without violating the maxRowsMaterialized constraint, do it.
* If we cannot, we process the rows pending processing (batchOfRacsPendingProcessing), and add the current rows to it which would then be processed in future.
*/
private void processBatchOfRacsIfNeeded()
private void processBatchOfRacsPendingProcessingIfNeeded()
{
if (currentBatchOfRacs.size() + batchOfRacsPendingProcessing.size() > maxRowsMaterialized) {
processBatchOfRacs();
processBatchOfRacsPendingProcessing();
}

batchOfRacsPendingProcessing.addAll(currentBatchOfRacs);
Expand All @@ -501,7 +501,7 @@ private void processBatchOfRacsIfNeeded()
/**
* Process {@link #batchOfRacsPendingProcessing}.
*/
private void processBatchOfRacs()
private void processBatchOfRacsPendingProcessing()
{
if (batchOfRacsPendingProcessing.isEmpty()) {
return;
Expand Down

0 comments on commit cca4904

Please sign in to comment.