Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WindowOperatorQueryFrameProcessor: Fix frame writer capacity issues + adhere to FrameProcessor's contract #17209

Merged
merged 10 commits into from
Oct 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final FrameWriterFactory frameWriterFactory;
private final FrameReader frameReader;
private final int maxRowsMaterialized;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private Cursor frameCursor = null;
private Supplier<ResultRow> rowSupplierFromFrameCursor;
private ResultRow outputRow = null;
Expand All @@ -99,6 +98,8 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final ArrayList<ResultRow> rowsToProcess;
private int lastPartitionIndex = -1;

final AtomicInteger rowId = new AtomicInteger(0);
kgyrtkirk marked this conversation as resolved.
Show resolved Hide resolved

public WindowOperatorQueryFrameProcessor(
WindowOperatorQuery query,
ReadableFrameChannel inputChannel,
Expand Down Expand Up @@ -155,7 +156,7 @@ public List<WritableFrameChannel> outputChannels()
}

@Override
public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException
{
/*
There are 2 scenarios:
Expand Down Expand Up @@ -216,32 +217,58 @@ Current approach with R&C and operators materialize a single R&C for processing.
Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. We might think to reimplement them in the MSQ way so that we do not have to materialize so much data.
*/

// If there are rows pending flush, flush them and run again before processing any more rows.
if (frameHasRowsPendingFlush()) {
flushAllRowsAndCols(resultRowAndCols, rowId);
return ReturnOrAwait.runAgain();
}

if (partitionColumnNames.isEmpty()) {
// Scenario 1: Query has atleast one window function with an OVER() clause without a PARTITION BY.
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
convertRowFrameToRowsAndColumns(frame);
return ReturnOrAwait.runAgain();
} else if (inputChannel.isFinished()) {
runAllOpsOnMultipleRac(frameRowsAndCols);
}

if (inputChannel.isFinished()) {
// If no rows are flushed yet, process all rows.
if (rowId.get() == 0) {
runAllOpsOnMultipleRac(frameRowsAndCols);
}

// If there are still rows pending after operations, run again.
if (frameHasRowsPendingFlush()) {
return ReturnOrAwait.runAgain();
}
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.awaitAll(inputChannels().size());
}
return ReturnOrAwait.awaitAll(inputChannels().size());
}

// This helps us avoid OOM issues, as it ensures that we don't keep accumulating the processed rows in-memory, if
// the rate of processing of the rows is higher than the allowed capacity of frame writer in each iteration of runIncrementally.
clearRACBuffers();
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved

// Scenario 2: All window functions in the query have OVER() clause with a PARTITION BY
if (frameCursor == null || frameCursor.isDone()) {
if (readableInputs.isEmpty()) {
return ReturnOrAwait.awaitAll(1);
} else if (inputChannel.canRead()) {
}

if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
makeRowSupplierFromFrameCursor();
} else if (inputChannel.isFinished()) {
// Handle any remaining data.
lastPartitionIndex = rowsToProcess.size() - 1;
processRowsUpToLastPartition();
// If we have some rows pending processing, process them.
// We run it again as it's possible that frame writer's capacity got reached and some output rows are
// pending flush to the output channel.
if (!rowsToProcess.isEmpty()) {
lastPartitionIndex = rowsToProcess.size() - 1;
processRowsUpToLastPartition();
return ReturnOrAwait.runAgain();
}
return ReturnOrAwait.returnObject(Unit.instance());
} else {
return ReturnOrAwait.runAgain();
Expand Down Expand Up @@ -313,32 +340,23 @@ public Operator.Signal push(RowsAndColumns rac)
public void completed()
{
try {
// resultRowsAndCols has reference to frameRowsAndCols
// due to the chain of calls across the ops
// so we can clear after writing to output
flushAllRowsAndCols(resultRowAndCols);
frameRowsAndCols.clear();

flushAllRowsAndCols(resultRowAndCols, rowId);
}
catch (IOException e) {
throw new RuntimeException(e);
}
finally {
frameRowsAndCols.clear();
resultRowAndCols.clear();
}
}
});
}

/**
* @param resultRowAndCols Flush the list of {@link RowsAndColumns} to a frame
* @param rowId An AtomicInteger representing the current row ID
* @throws IOException
*/
private void flushAllRowsAndCols(ArrayList<RowsAndColumns> resultRowAndCols) throws IOException
private void flushAllRowsAndCols(ArrayList<RowsAndColumns> resultRowAndCols, AtomicInteger rowId) throws IOException
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
{
RowsAndColumns rac = new ConcatRowsAndColumns(resultRowAndCols);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its just performance: but the creation of this rac is an O(n) operation; regardless where the rowId stands.
that's why it would have been better to just pack all these things into an inner-workhorse class....when that will be done this should be taken into account.

AtomicInteger rowId = new AtomicInteger(0);
createFrameWriterIfNeeded(rac, rowId);
writeRacToFrame(rac, rowId);
}
Expand All @@ -355,7 +373,6 @@ private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId)
final ColumnSelectorFactory frameWriterColumnSelectorFactoryWithVirtualColumns =
frameWriterVirtualColumns.wrap(frameWriterColumnSelectorFactory);
frameWriter = frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactoryWithVirtualColumns);
currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
}
}

Expand All @@ -367,19 +384,26 @@ private void createFrameWriterIfNeeded(RowsAndColumns rac, AtomicInteger rowId)
public void writeRacToFrame(RowsAndColumns rac, AtomicInteger rowId) throws IOException
{
final int numRows = rac.numRows();
rowId.set(0);
while (rowId.get() < numRows) {
final boolean didAddToFrame = frameWriter.addSelection();
if (didAddToFrame) {
if (frameWriter.addSelection()) {
incrementBoostColumn();
rowId.incrementAndGet();
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1);
} else if (frameWriter.getNumRows() == 0) {
throw new FrameRowTooLargeException(currentAllocatorCapacity);
} else {
} else if (frameWriter.getNumRows() > 0) {
flushFrameWriter();
return;
kgyrtkirk marked this conversation as resolved.
Show resolved Hide resolved
createFrameWriterIfNeeded(rac, rowId);

if (frameWriter.addSelection()) {
incrementBoostColumn();
rowId.incrementAndGet();
return;
Akshat-Jain marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
}
} else {
throw new FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
}
}

flushFrameWriter();
}

Expand Down Expand Up @@ -521,4 +545,28 @@ private void ensureMaxRowsInAWindowConstraint(int numRowsInWindow)
));
}
}

/**
* Increments the value of the partition boosting column. It should be called once the row value has been written
* to the frame
*/
private void incrementBoostColumn()
{
partitionBoostVirtualColumn.setValue(partitionBoostVirtualColumn.getValue() + 1);
kgyrtkirk marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @return true if frame has rows pending flush to the output channel, false otherwise.
*/
private boolean frameHasRowsPendingFlush()
{
return frameWriter != null && frameWriter.getNumRows() > 0;
}

private void clearRACBuffers()
{
frameRowsAndCols.clear();
resultRowAndCols.clear();
rowId.set(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,99 @@

public class WindowOperatorQueryFrameProcessorTest extends FrameProcessorTestBase
{
private static final List<Map<String, Object>> INPUT_ROWS = ImmutableList.of(
ImmutableMap.of("added", 1L, "cityName", "city1"),
ImmutableMap.of("added", 1L, "cityName", "city2"),
ImmutableMap.of("added", 2L, "cityName", "city3"),
ImmutableMap.of("added", 2L, "cityName", "city4"),
ImmutableMap.of("added", 2L, "cityName", "city5"),
ImmutableMap.of("added", 3L, "cityName", "city6"),
ImmutableMap.of("added", 3L, "cityName", "city7")
);

@Test
public void testFrameWriterReachingCapacity() throws IOException
{
// This test validates that all output rows are flushed to the output channel even if frame writer's
// capacity is reached, by subsequent iterations of runIncrementally.
final ReadableInput factChannel = buildWindowTestInputChannel();

RowSignature inputSignature = RowSignature.builder()
.add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.build();

FrameReader frameReader = FrameReader.create(inputSignature);

RowSignature outputSignature = RowSignature.builder()
.addAll(inputSignature)
.add("w0", ColumnType.LONG)
.build();

final WindowOperatorQuery query = new WindowOperatorQuery(
new QueryDataSource(
Druids.newScanQueryBuilder()
.dataSource(new TableDataSource("test"))
.intervals(new LegacySegmentSpec(Intervals.ETERNITY))
.columns("cityName", "added")
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(new HashMap<>())
.build()),
new LegacySegmentSpec(Intervals.ETERNITY),
new HashMap<>(),
outputSignature,
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
ImmutableList.of()
);

final FrameWriterFactory frameWriterFactory = new LimitedFrameWriterFactory(
FrameWriters.makeRowBasedFrameWriterFactory(
new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
outputSignature,
Collections.emptyList(),
false
),
INPUT_ROWS.size() / 4 // This forces frameWriter's capacity to be reached.
);

final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
final WindowOperatorQueryFrameProcessor processor = new WindowOperatorQueryFrameProcessor(
query,
factChannel.getChannel(),
outputChannel.writable(),
frameWriterFactory,
frameReader,
new ObjectMapper(),
ImmutableList.of(
new WindowOperatorFactory(new WindowRowNumberProcessor("w0"))
),
inputSignature,
100,
ImmutableList.of("added")
);

exec.runFully(processor, null);

final Sequence<List<Object>> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel(
outputChannel.readable(),
FrameReader.create(outputSignature)
);

List<List<Object>> outputRows = rowsFromProcessor.toList();
Assert.assertEquals(INPUT_ROWS.size(), outputRows.size());

for (int i = 0; i < INPUT_ROWS.size(); i++) {
Map<String, Object> inputRow = INPUT_ROWS.get(i);
List<Object> outputRow = outputRows.get(i);

Assert.assertEquals("cityName should match", inputRow.get("cityName"), outputRow.get(0));
Assert.assertEquals("added should match", inputRow.get("added"), outputRow.get(1));
Assert.assertEquals("row_number() should be correct", (long) i + 1, outputRow.get(2));
}
}

@Test
public void testBatchingOfPartitionByKeys_singleBatch() throws Exception
{
Expand Down Expand Up @@ -195,18 +288,7 @@ private ReadableInput buildWindowTestInputChannel() throws IOException
.add("cityName", ColumnType.STRING)
.add("added", ColumnType.LONG)
.build();

List<Map<String, Object>> rows = ImmutableList.of(
ImmutableMap.of("added", 1L, "cityName", "city1"),
ImmutableMap.of("added", 1L, "cityName", "city2"),
ImmutableMap.of("added", 2L, "cityName", "city3"),
ImmutableMap.of("added", 2L, "cityName", "city4"),
ImmutableMap.of("added", 2L, "cityName", "city5"),
ImmutableMap.of("added", 3L, "cityName", "city6"),
ImmutableMap.of("added", 3L, "cityName", "city7")
);

return makeChannelFromRows(rows, inputSignature, Collections.emptyList());
return makeChannelFromRows(INPUT_ROWS, inputSignature, Collections.emptyList());
}

private ReadableInput makeChannelFromRows(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
!set plannerStrategy DECOUPLED
!use druidtest://?componentSupplier=DrillWindowQueryMSQComponentSupplier
!set outputformat mysql

# This test validates that all output rows are flushed to the output channel even if frame writer's capacity is reached.

select count(*) as actualNumRows
from (
select countryName, cityName, channel, added, delta, row_number() over() as rowNumber
from wikipedia
group by countryName, cityName, channel, added, delta
);
+---------------+
| actualNumRows |
+---------------+
| 11631 |
+---------------+
(1 row)

!ok

# Validate that all rows are outputted by window WindowOperatorQueryFrameProcessor layer for empty over() clause scenario.

select count(*) as numRows, max(rowNumber) as maxRowNumber
from (
select countryName, cityName, channel, added, delta, row_number() over() as rowNumber
from wikipedia
group by countryName, cityName, channel, added, delta
);
+---------+--------------+
| numRows | maxRowNumber |
+---------+--------------+
| 11631 | 11631 |
+---------+--------------+
(1 row)

!ok

# Validate that all rows are outputted by window WindowOperatorQueryFrameProcessor layer for non-empty over() clause scenario.

select rowNumber, count(rowNumber) as numRows
from (
select countryName, cityName, channel, added, delta, row_number() over(partition by countryName, cityName, channel, added, delta) as rowNumber
from wikipedia
group by countryName, cityName, channel, added, delta
) group by rowNumber;
+-----------+---------+
| rowNumber | numRows |
+-----------+---------+
| 1 | 11631 |
+-----------+---------+
(1 row)

!ok
Loading