Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla committed Jul 25, 2024
1 parent 6cf6838 commit 22e01a7
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,18 @@
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
import org.apache.druid.frame.segment.FrameCursorUtils;
import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.IterableRowsCursorHelper;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
Expand All @@ -36,11 +45,14 @@
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.column.NullColumn;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;

public class WindowOperatorQueryQueryToolChest extends QueryToolChest<RowsAndColumns, WindowOperatorQuery>
Expand Down Expand Up @@ -116,6 +128,36 @@ public Sequence<Object[]> resultsAsArrays(
return (Sequence) resultSequence;
}

@Override
public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
WindowOperatorQuery query,
Sequence<RowsAndColumns> resultSequence,
MemoryAllocatorFactory memoryAllocatorFactory,
boolean useNestedForUnknownTypes
)
{
RowSignature rowSignature = resultArraySignature(query);
RowSignature modifiedRowSignature = useNestedForUnknownTypes
? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature)
: rowSignature;
FrameCursorUtils.throwIfColumnsHaveUnknownType(modifiedRowSignature);
FrameWriterFactory frameWriterFactory = FrameWriters.makeColumnBasedFrameWriterFactory(
memoryAllocatorFactory,
modifiedRowSignature,
new ArrayList<>()
);
Pair<Cursor, Closeable> cursorAndCloseable = IterableRowsCursorHelper.getCursorFromSequence(
resultsAsArrays(query, resultSequence),
rowSignature
);
Cursor cursor = cursorAndCloseable.lhs;
Closeable closeble = cursorAndCloseable.rhs;

Sequence<Frame> frames = FrameCursorUtils.cursorToFramesSequence(cursor, frameWriterFactory).withBaggage(closeble);

return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature)));
}

/**
* This class exists to unravel the RowsAndColumns that are used in this query and make it the return Sequence
* actually be a Sequence of rows. This is relatively broken in a number of regards, the most obvious of which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ public void windowQueryTestWithCustomContextMaxSubqueryBytes(String filename) th
.sql(testCase.getSql())
.queryContext(ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true,
PlannerContext.CTX_ENABLE_WINDOW_FNS, true,
QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000"
QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000",
QueryContexts.MAX_SUBQUERY_ROWS_KEY, "0"
)
)
.addCustomVerification(QueryVerification.ofResults(testCase))
Expand Down

0 comments on commit 22e01a7

Please sign in to comment.