Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
clintropolis committed Jul 31, 2024
1 parent 246dd70 commit ab10354
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorMaker;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.column.RowSignature;

Expand Down Expand Up @@ -80,21 +82,22 @@ public RowSignature getRowSignature()

public Sequence<Object[]> getRowsAsSequence()
{
final Closer closer = Closer.create();
final Sequence<Cursor> cursorSequence =
Sequences.simple(frames)
.map(
frameSignaturePair -> {
Frame frame = frameSignaturePair.getFrame();
RowSignature frameSignature = frameSignaturePair.getRowSignature();
FrameReader frameReader = FrameReader.create(frameSignature);
// currently FrameStorageAdapter cursor maker doesn't have any resources which need closed, but
// if this changes we need to register this cursor maker with a closer as the baggage of the
// sequence
return new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY).asCursorMaker(
CursorBuildSpec.FULL_SCAN
).makeCursor();
final CursorMaker maker = closer.register(
new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY).asCursorMaker(
CursorBuildSpec.FULL_SCAN
)
);
return maker.makeCursor();
}
);
).withBaggage(closer);

return cursorSequence.flatMap(
(cursor) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ public CursorMaker asCursorMaker(CursorBuildSpec spec)


if (clauses.isEmpty()) {
// HashJoinEngine isn't vectorized yet.
// However, we can still vectorize if there are no clauses, since that means all we need to do is apply
// a base filter. That's easy enough!
// if there are no clauses, we can just use the base cursor directly if we apply the combined filter
final CursorBuildSpec newSpec = cursorBuildSpecBuilder.setFilter(combinedFilter)
.setVirtualColumns(spec.getVirtualColumns())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.FrameBasedInlineDataSource;
import org.apache.druid.query.FrameSignaturePair;
Expand Down Expand Up @@ -113,6 +114,7 @@ public FrameBasedIndexedTable(
indexBuilders.add(m);
}

final Closer closer = Closer.create();
final Sequence<Cursor> cursors = Sequences.simple(
frameBasedInlineDataSource
.getFrames()
Expand All @@ -122,12 +124,10 @@ public FrameBasedIndexedTable(
RowSignature rowSignature = frameSignaturePair.getRowSignature();
FrameStorageAdapter frameStorageAdapter =
new FrameStorageAdapter(frame, FrameReader.create(rowSignature), Intervals.ETERNITY);
// currently FrameStorageAdapter cursor maker doesn't have any resources which need closed, but if this
// changes we need to register this cursor maker with a closer as the baggage of the sequence
return frameStorageAdapter.asCursorMaker(CursorBuildSpec.FULL_SCAN).makeCursor();
return closer.register(frameStorageAdapter.asCursorMaker(CursorBuildSpec.FULL_SCAN)).makeCursor();
})
.collect(Collectors.toList())
);
).withBaggage(closer);

final Sequence<Integer> sequence = Sequences.map(
cursors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,15 @@ public static Sequence<List<Object>> readRowsFromFrameChannel(
{
return new FrameChannelSequence(channel)
.flatMap(
frame ->
readRowsFromCursor(
// if FrameStorageAdapter.asCursorMaker ever needs closing.. this needs to change to add to a
// closer that is tied to baggage of this sequence...
new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
.asCursorMaker(CursorBuildSpec.FULL_SCAN)
.makeCursor(),
frameReader.signature()
)
frame -> {
final CursorMaker maker = new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY).asCursorMaker(
CursorBuildSpec.FULL_SCAN
);
return readRowsFromCursor(
maker.makeCursor(),
frameReader.signature()
).withBaggage(maker);
}
);
}

Expand Down

0 comments on commit ab10354

Please sign in to comment.